1. 企业级架构设计

1.1 分布式部署架构

1.1.1 多节点部署方案

# enterprise_architecture.yml
# GoAccess企业级部署架构配置

architecture:
  deployment_type: "distributed"
  
  load_balancer:
    type: "nginx"
    instances: 2
    config:
      upstream_servers:
        - "goaccess-node-1:7890"
        - "goaccess-node-2:7890"
        - "goaccess-node-3:7890"
      health_check: true
      failover: true
  
  goaccess_nodes:
    - name: "goaccess-node-1"
      role: "primary"
      resources:
        cpu: "4 cores"
        memory: "8GB"
        storage: "500GB SSD"
      log_sources:
        - "/var/log/nginx/access.log"
        - "/var/log/apache2/access.log"
    
    - name: "goaccess-node-2"
      role: "secondary"
      resources:
        cpu: "4 cores"
        memory: "8GB"
        storage: "500GB SSD"
      log_sources:
        - "/var/log/nginx/api.log"
        - "/var/log/application/app.log"
    
    - name: "goaccess-node-3"
      role: "analytics"
      resources:
        cpu: "8 cores"
        memory: "16GB"
        storage: "1TB SSD"
      log_sources:
        - "/var/log/nginx/combined.log"
        - "/var/log/cdn/access.log"
  
  data_storage:
    type: "distributed"
    backend: "elasticsearch"
    replication_factor: 3
    sharding_strategy: "time_based"
  
  monitoring:
    prometheus:
      enabled: true
      scrape_interval: "30s"
    grafana:
      enabled: true
      dashboards: "custom"
    alerting:
      enabled: true
      channels: ["email", "slack", "webhook"]

network:
  security:
    ssl_enabled: true
    certificate_type: "letsencrypt"
    firewall_rules:
      - "allow 80/tcp"
      - "allow 443/tcp"
      - "allow 7890/tcp from internal"
  
  cdn:
    enabled: true
    provider: "cloudflare"
    caching_rules:
      static_assets: "1d"
      api_responses: "5m"

backup:
  strategy: "incremental"
  frequency: "daily"
  retention: "30 days"
  storage: "s3"

1.1.2 Docker Swarm部署

# docker-compose.swarm.yml
# Docker Swarm集群部署配置

version: '3.8'

services:
  goaccess-primary:
    image: goaccess/goaccess:latest
    deploy:
      replicas: 2
      placement:
        constraints:
          - node.role == manager
      resources:
        limits:
          cpus: '2.0'
          memory: 4G
        reservations:
          cpus: '1.0'
          memory: 2G
      restart_policy:
        condition: on-failure
        delay: 5s
        max_attempts: 3
    volumes:
      - /var/log/nginx:/var/log/nginx:ro
      - goaccess-data:/data
      - ./config:/etc/goaccess
    environment:
      - GOACCESS_CONFIG_FILE=/etc/goaccess/goaccess.conf
      - GOACCESS_LOG_FORMAT=COMBINED
    networks:
      - goaccess-network
    ports:
      - "7890:7890"
  
  goaccess-worker:
    image: goaccess/goaccess:latest
    deploy:
      replicas: 3
      placement:
        constraints:
          - node.role == worker
      resources:
        limits:
          cpus: '1.5'
          memory: 3G
        reservations:
          cpus: '0.5'
          memory: 1G
    volumes:
      - /var/log/applications:/var/log/applications:ro
      - goaccess-worker-data:/data
      - ./config:/etc/goaccess
    environment:
      - GOACCESS_CONFIG_FILE=/etc/goaccess/goaccess.conf
      - GOACCESS_WORKER_MODE=true
    networks:
      - goaccess-network
  
  nginx-lb:
    image: nginx:alpine
    deploy:
      replicas: 2
      placement:
        constraints:
          - node.role == manager
    volumes:
      - ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro
      - ./nginx/ssl:/etc/nginx/ssl:ro
    ports:
      - "80:80"
      - "443:443"
    networks:
      - goaccess-network
    depends_on:
      - goaccess-primary
  
  redis-cluster:
    image: redis:alpine
    deploy:
      replicas: 3
      placement:
        max_replicas_per_node: 1
    command: redis-server --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes
    volumes:
      - redis-data:/data
    networks:
      - goaccess-network
  
  prometheus:
    image: prom/prometheus:latest
    deploy:
      replicas: 1
      placement:
        constraints:
          - node.role == manager
    volumes:
      - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro
      - prometheus-data:/prometheus
    ports:
      - "9090:9090"
    networks:
      - goaccess-network
  
  grafana:
    image: grafana/grafana:latest
    deploy:
      replicas: 1
      placement:
        constraints:
          - node.role == manager
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin123
      - GF_INSTALL_PLUGINS=grafana-piechart-panel
    volumes:
      - grafana-data:/var/lib/grafana
      - ./grafana/dashboards:/etc/grafana/provisioning/dashboards:ro
      - ./grafana/datasources:/etc/grafana/provisioning/datasources:ro
    ports:
      - "3000:3000"
    networks:
      - goaccess-network

volumes:
  goaccess-data:
    driver: local
  goaccess-worker-data:
    driver: local
  redis-data:
    driver: local
  prometheus-data:
    driver: local
  grafana-data:
    driver: local

networks:
  goaccess-network:
    driver: overlay
    attachable: true

1.1.3 Kubernetes部署

# k8s-deployment.yml
# Kubernetes部署配置

apiVersion: v1
kind: Namespace
metadata:
  name: goaccess

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: goaccess-config
  namespace: goaccess
data:
  goaccess.conf: |
    time-format %H:%M:%S
    date-format %d/%b/%Y
    log-format COMBINED
    
    real-time-html true
    ws-url wss://goaccess.example.com:443/ws
    
    persist true
    restore true
    db-path /data/goaccess.db
    
    geoip-database /usr/share/GeoIP/GeoLite2-Country.mmdb
    
    html-prefs '{"theme":"bright","perPage":10}'
    html-custom-css /etc/goaccess/custom.css
    html-custom-js /etc/goaccess/custom.js

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: goaccess-primary
  namespace: goaccess
spec:
  replicas: 2
  selector:
    matchLabels:
      app: goaccess-primary
  template:
    metadata:
      labels:
        app: goaccess-primary
    spec:
      containers:
      - name: goaccess
        image: goaccess/goaccess:1.7
        ports:
        - containerPort: 7890
        volumeMounts:
        - name: config
          mountPath: /etc/goaccess
        - name: logs
          mountPath: /var/log/nginx
          readOnly: true
        - name: data
          mountPath: /data
        env:
        - name: GOACCESS_CONFIG_FILE
          value: "/etc/goaccess/goaccess.conf"
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 7890
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 7890
          initialDelaySeconds: 5
          periodSeconds: 5
      volumes:
      - name: config
        configMap:
          name: goaccess-config
      - name: logs
        hostPath:
          path: /var/log/nginx
      - name: data
        persistentVolumeClaim:
          claimName: goaccess-data-pvc

---
apiVersion: v1
kind: Service
metadata:
  name: goaccess-service
  namespace: goaccess
spec:
  selector:
    app: goaccess-primary
  ports:
  - protocol: TCP
    port: 80
    targetPort: 7890
  type: ClusterIP

---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: goaccess-ingress
  namespace: goaccess
  annotations:
    kubernetes.io/ingress.class: nginx
    cert-manager.io/cluster-issuer: letsencrypt-prod
    nginx.ingress.kubernetes.io/websocket-services: goaccess-service
    nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
    nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
spec:
  tls:
  - hosts:
    - goaccess.example.com
    secretName: goaccess-tls
  rules:
  - host: goaccess.example.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: goaccess-service
            port:
              number: 80

---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: goaccess-data-pvc
  namespace: goaccess
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 100Gi
  storageClassName: fast-ssd

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: goaccess-hpa
  namespace: goaccess
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: goaccess-primary
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80

5.4.2 Docker Swarm部署

# docker-swarm.yml - Docker Swarm部署配置
version: '3.8'

services:
  goaccess:
    image: goaccess/goaccess:latest
    deploy:
      replicas: 3
      placement:
        constraints:
          - node.role == worker
      resources:
        limits:
          memory: 2G
          cpus: '1.0'
        reservations:
          memory: 512M
          cpus: '0.25'
      restart_policy:
        condition: on-failure
        delay: 5s
        max_attempts: 3
      update_config:
        parallelism: 1
        delay: 10s
        failure_action: rollback
    ports:
      - "7890:7890"
      - "8000:8000"
    volumes:
      - goaccess_data:/data
      - goaccess_config:/etc/goaccess
      - /var/log/nginx:/var/log/nginx:ro
    environment:
      - GOACCESS_CONFIG_FILE=/etc/goaccess/goaccess.conf
      - ENCRYPTION_KEY_FILE=/run/secrets/encryption_key
      - JWT_SECRET_FILE=/run/secrets/jwt_secret
    secrets:
      - encryption_key
      - jwt_secret
    networks:
      - goaccess_network
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 40s

  redis:
    image: redis:7-alpine
    deploy:
      replicas: 1
      placement:
        constraints:
          - node.role == manager
      resources:
        limits:
          memory: 1G
          cpus: '0.5'
    volumes:
      - redis_data:/data
    networks:
      - goaccess_network
    command: redis-server --appendonly yes --maxmemory 512mb --maxmemory-policy allkeys-lru

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
    deploy:
      replicas: 3
      placement:
        constraints:
          - node.role == worker
      resources:
        limits:
          memory: 4G
          cpus: '2.0'
        reservations:
          memory: 2G
          cpus: '1.0'
    environment:
      - cluster.name=goaccess-cluster
      - node.name={{.Node.Hostname}}
      - discovery.seed_hosts=elasticsearch
      - cluster.initial_master_nodes=node-1,node-2,node-3
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms2g -Xmx2g"
      - xpack.security.enabled=false
    volumes:
      - elasticsearch_data:/usr/share/elasticsearch/data
    networks:
      - goaccess_network
    ulimits:
      memlock:
        soft: -1
        hard: -1

  kibana:
    image: docker.elastic.co/kibana/kibana:8.11.0
    deploy:
      replicas: 1
      placement:
        constraints:
          - node.role == manager
    ports:
      - "5601:5601"
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
      - SERVER_NAME=kibana
      - SERVER_HOST=0.0.0.0
    networks:
      - goaccess_network
    depends_on:
      - elasticsearch

  prometheus:
    image: prom/prometheus:latest
    deploy:
      replicas: 1
      placement:
        constraints:
          - node.role == manager
    ports:
      - "9090:9090"
    volumes:
      - prometheus_config:/etc/prometheus
      - prometheus_data:/prometheus
    networks:
      - goaccess_network
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--web.console.libraries=/etc/prometheus/console_libraries'
      - '--web.console.templates=/etc/prometheus/consoles'
      - '--storage.tsdb.retention.time=200h'
      - '--web.enable-lifecycle'

  grafana:
    image: grafana/grafana:latest
    deploy:
      replicas: 1
      placement:
        constraints:
          - node.role == manager
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin123
      - GF_USERS_ALLOW_SIGN_UP=false
    volumes:
      - grafana_data:/var/lib/grafana
      - grafana_config:/etc/grafana
    networks:
      - goaccess_network
    depends_on:
      - prometheus

  nginx:
    image: nginx:alpine
    deploy:
      replicas: 2
      placement:
        constraints:
          - node.role == worker
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - nginx_config:/etc/nginx
      - nginx_logs:/var/log/nginx
      - ssl_certs:/etc/ssl/certs
    networks:
      - goaccess_network
    depends_on:
      - goaccess

volumes:
  goaccess_data:
    driver: local
  goaccess_config:
    driver: local
  redis_data:
    driver: local
  elasticsearch_data:
    driver: local
  prometheus_data:
    driver: local
  prometheus_config:
    driver: local
  grafana_data:
    driver: local
  grafana_config:
    driver: local
  nginx_config:
    driver: local
  nginx_logs:
    driver: local
  ssl_certs:
    driver: local

networks:
  goaccess_network:
    driver: overlay
    attachable: true

secrets:
  encryption_key:
    external: true
  jwt_secret:
    external: true

5.4.3 企业级监控配置

#!/usr/bin/env python3
# enterprise_monitoring.py - 企业级监控系统

import asyncio
import json
import logging
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from enum import Enum
import aiohttp
import asyncpg
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import redis.asyncio as redis
from elasticsearch import AsyncElasticsearch
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import yaml
import psutil
import docker
from kubernetes import client, config

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class AlertSeverity(Enum):
    """告警严重程度"""
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

class MonitoringMetric(Enum):
    """监控指标类型"""
    CPU_USAGE = "cpu_usage"
    MEMORY_USAGE = "memory_usage"
    DISK_USAGE = "disk_usage"
    NETWORK_IO = "network_io"
    REQUEST_RATE = "request_rate"
    ERROR_RATE = "error_rate"
    RESPONSE_TIME = "response_time"
    ACTIVE_CONNECTIONS = "active_connections"
    QUEUE_SIZE = "queue_size"
    DATABASE_CONNECTIONS = "database_connections"

@dataclass
class Alert:
    """告警信息"""
    id: str
    timestamp: datetime
    severity: AlertSeverity
    metric: MonitoringMetric
    message: str
    value: float
    threshold: float
    source: str
    tags: Dict[str, str]
    resolved: bool = False
    resolved_at: Optional[datetime] = None

@dataclass
class MonitoringRule:
    """监控规则"""
    name: str
    metric: MonitoringMetric
    threshold: float
    operator: str  # >, <, >=, <=, ==, !=
    severity: AlertSeverity
    duration: int  # 持续时间(秒)
    enabled: bool = True
    tags: Dict[str, str] = None

class EnterpriseMonitoring:
    """企业级监控系统"""
    
    def __init__(self, config_file: str):
        self.config = self.load_config(config_file)
        self.redis_client = None
        self.es_client = None
        self.db_pool = None
        self.docker_client = None
        self.k8s_client = None
        
        # Prometheus指标
        self.metrics = {
            'requests_total': Counter('goaccess_requests_total', 'Total requests', ['method', 'status']),
            'request_duration': Histogram('goaccess_request_duration_seconds', 'Request duration'),
            'active_connections': Gauge('goaccess_active_connections', 'Active connections'),
            'cpu_usage': Gauge('goaccess_cpu_usage_percent', 'CPU usage percentage'),
            'memory_usage': Gauge('goaccess_memory_usage_bytes', 'Memory usage in bytes'),
            'disk_usage': Gauge('goaccess_disk_usage_percent', 'Disk usage percentage'),
            'error_rate': Gauge('goaccess_error_rate', 'Error rate'),
            'queue_size': Gauge('goaccess_queue_size', 'Queue size'),
            'alerts_total': Counter('goaccess_alerts_total', 'Total alerts', ['severity'])
        }
        
        self.monitoring_rules = self.load_monitoring_rules()
        self.active_alerts = {}
        self.alert_history = []
        
    def load_config(self, config_file: str) -> Dict:
        """加载配置文件"""
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                return yaml.safe_load(f)
        except Exception as e:
            logger.error(f"加载监控配置失败: {e}")
            return {}
    
    def load_monitoring_rules(self) -> List[MonitoringRule]:
        """加载监控规则"""
        rules = []
        for rule_config in self.config.get('monitoring_rules', []):
            rule = MonitoringRule(
                name=rule_config['name'],
                metric=MonitoringMetric(rule_config['metric']),
                threshold=rule_config['threshold'],
                operator=rule_config['operator'],
                severity=AlertSeverity(rule_config['severity']),
                duration=rule_config.get('duration', 60),
                enabled=rule_config.get('enabled', True),
                tags=rule_config.get('tags', {})
            )
            rules.append(rule)
        return rules
    
    async def initialize(self):
        """初始化监控系统"""
        try:
            # 初始化Redis连接
            redis_config = self.config.get('redis', {})
            self.redis_client = redis.Redis(
                host=redis_config.get('host', 'localhost'),
                port=redis_config.get('port', 6379),
                db=redis_config.get('db', 0),
                decode_responses=True
            )
            
            # 初始化Elasticsearch连接
            es_config = self.config.get('elasticsearch', {})
            self.es_client = AsyncElasticsearch(
                hosts=es_config.get('hosts', ['localhost:9200'])
            )
            
            # 初始化数据库连接池
            db_config = self.config.get('database', {})
            if db_config:
                self.db_pool = await asyncpg.create_pool(
                    host=db_config.get('host', 'localhost'),
                    port=db_config.get('port', 5432),
                    user=db_config.get('user', 'postgres'),
                    password=db_config.get('password', ''),
                    database=db_config.get('database', 'monitoring'),
                    min_size=5,
                    max_size=20
                )
            
            # 初始化Docker客户端
            if self.config.get('docker', {}).get('enabled', False):
                self.docker_client = docker.from_env()
            
            # 初始化Kubernetes客户端
            if self.config.get('kubernetes', {}).get('enabled', False):
                try:
                    config.load_incluster_config()
                except:
                    config.load_kube_config()
                self.k8s_client = client.CoreV1Api()
            
            # 启动Prometheus指标服务器
            prometheus_port = self.config.get('prometheus', {}).get('port', 8000)
            start_http_server(prometheus_port)
            
            logger.info("企业级监控系统初始化完成")
            
        except Exception as e:
            logger.error(f"监控系统初始化失败: {e}")
            raise
    
    async def collect_system_metrics(self) -> Dict[str, float]:
        """收集系统指标"""
        try:
            # CPU使用率
            cpu_percent = psutil.cpu_percent(interval=1)
            
            # 内存使用情况
            memory = psutil.virtual_memory()
            memory_percent = memory.percent
            memory_used = memory.used
            
            # 磁盘使用情况
            disk = psutil.disk_usage('/')
            disk_percent = disk.percent
            
            # 网络IO
            network = psutil.net_io_counters()
            network_bytes_sent = network.bytes_sent
            network_bytes_recv = network.bytes_recv
            
            # 更新Prometheus指标
            self.metrics['cpu_usage'].set(cpu_percent)
            self.metrics['memory_usage'].set(memory_used)
            self.metrics['disk_usage'].set(disk_percent)
            
            return {
                'cpu_usage': cpu_percent,
                'memory_usage_percent': memory_percent,
                'memory_usage_bytes': memory_used,
                'disk_usage': disk_percent,
                'network_bytes_sent': network_bytes_sent,
                'network_bytes_recv': network_bytes_recv
            }
            
        except Exception as e:
            logger.error(f"收集系统指标失败: {e}")
            return {}
    
    async def collect_goaccess_metrics(self) -> Dict[str, float]:
        """收集GoAccess指标"""
        try:
            metrics = {}
            
            # 从Redis获取实时指标
            if self.redis_client:
                request_rate = await self.redis_client.get('goaccess:request_rate')
                error_rate = await self.redis_client.get('goaccess:error_rate')
                active_connections = await self.redis_client.get('goaccess:active_connections')
                
                if request_rate:
                    metrics['request_rate'] = float(request_rate)
                if error_rate:
                    metrics['error_rate'] = float(error_rate)
                    self.metrics['error_rate'].set(float(error_rate))
                if active_connections:
                    metrics['active_connections'] = float(active_connections)
                    self.metrics['active_connections'].set(float(active_connections))
            
            # 从Elasticsearch获取历史指标
            if self.es_client:
                # 查询最近5分钟的平均响应时间
                query = {
                    "query": {
                        "range": {
                            "@timestamp": {
                                "gte": "now-5m"
                            }
                        }
                    },
                    "aggs": {
                        "avg_response_time": {
                            "avg": {
                                "field": "response_time"
                            }
                        }
                    }
                }
                
                result = await self.es_client.search(
                    index="goaccess-*",
                    body=query
                )
                
                if result['aggregations']['avg_response_time']['value']:
                    avg_response_time = result['aggregations']['avg_response_time']['value']
                    metrics['response_time'] = avg_response_time
            
            return metrics
            
        except Exception as e:
            logger.error(f"收集GoAccess指标失败: {e}")
            return {}
    
    async def collect_docker_metrics(self) -> Dict[str, Any]:
        """收集Docker容器指标"""
        try:
            if not self.docker_client:
                return {}
            
            containers = self.docker_client.containers.list()
            metrics = {
                'total_containers': len(containers),
                'running_containers': len([c for c in containers if c.status == 'running']),
                'container_stats': []
            }
            
            for container in containers:
                if 'goaccess' in container.name:
                    stats = container.stats(stream=False)
                    
                    # CPU使用率
                    cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - \
                               stats['precpu_stats']['cpu_usage']['total_usage']
                    system_delta = stats['cpu_stats']['system_cpu_usage'] - \
                                  stats['precpu_stats']['system_cpu_usage']
                    cpu_percent = (cpu_delta / system_delta) * 100.0
                    
                    # 内存使用率
                    memory_usage = stats['memory_stats']['usage']
                    memory_limit = stats['memory_stats']['limit']
                    memory_percent = (memory_usage / memory_limit) * 100.0
                    
                    container_metrics = {
                        'name': container.name,
                        'status': container.status,
                        'cpu_percent': cpu_percent,
                        'memory_usage': memory_usage,
                        'memory_percent': memory_percent
                    }
                    
                    metrics['container_stats'].append(container_metrics)
            
            return metrics
            
        except Exception as e:
            logger.error(f"收集Docker指标失败: {e}")
            return {}
    
    async def collect_kubernetes_metrics(self) -> Dict[str, Any]:
        """收集Kubernetes指标"""
        try:
            if not self.k8s_client:
                return {}
            
            # 获取Pod信息
            pods = self.k8s_client.list_namespaced_pod(
                namespace=self.config.get('kubernetes', {}).get('namespace', 'default')
            )
            
            metrics = {
                'total_pods': len(pods.items),
                'running_pods': len([p for p in pods.items if p.status.phase == 'Running']),
                'pending_pods': len([p for p in pods.items if p.status.phase == 'Pending']),
                'failed_pods': len([p for p in pods.items if p.status.phase == 'Failed']),
                'pod_stats': []
            }
            
            for pod in pods.items:
                if 'goaccess' in pod.metadata.name:
                    pod_metrics = {
                        'name': pod.metadata.name,
                        'namespace': pod.metadata.namespace,
                        'phase': pod.status.phase,
                        'node': pod.spec.node_name,
                        'restart_count': sum([c.restart_count for c in pod.status.container_statuses or []])
                    }
                    metrics['pod_stats'].append(pod_metrics)
            
            return metrics
            
        except Exception as e:
            logger.error(f"收集Kubernetes指标失败: {e}")
            return {}
    
    def evaluate_rules(self, metrics: Dict[str, float]):
        """评估监控规则"""
        current_time = datetime.now()
        
        for rule in self.monitoring_rules:
            if not rule.enabled:
                continue
            
            metric_value = metrics.get(rule.metric.value)
            if metric_value is None:
                continue
            
            # 评估条件
            condition_met = False
            if rule.operator == '>':
                condition_met = metric_value > rule.threshold
            elif rule.operator == '<':
                condition_met = metric_value < rule.threshold
            elif rule.operator == '>=':
                condition_met = metric_value >= rule.threshold
            elif rule.operator == '<=':
                condition_met = metric_value <= rule.threshold
            elif rule.operator == '==':
                condition_met = metric_value == rule.threshold
            elif rule.operator == '!=':
                condition_met = metric_value != rule.threshold
            
            alert_key = f"{rule.name}_{rule.metric.value}"
            
            if condition_met:
                # 检查是否已存在活跃告警
                if alert_key not in self.active_alerts:
                    # 创建新告警
                    alert = Alert(
                        id=f"{alert_key}_{int(current_time.timestamp())}",
                        timestamp=current_time,
                        severity=rule.severity,
                        metric=rule.metric,
                        message=f"{rule.name}: {rule.metric.value} {rule.operator} {rule.threshold} (当前值: {metric_value})",
                        value=metric_value,
                        threshold=rule.threshold,
                        source="enterprise_monitoring",
                        tags=rule.tags or {}
                    )
                    
                    self.active_alerts[alert_key] = alert
                    self.alert_history.append(alert)
                    
                    # 更新Prometheus指标
                    self.metrics['alerts_total'].labels(severity=rule.severity.value).inc()
                    
                    # 发送告警
                    asyncio.create_task(self.send_alert(alert))
                    
                    logger.warning(f"触发告警: {alert.message}")
            else:
                # 检查是否需要解决告警
                if alert_key in self.active_alerts:
                    alert = self.active_alerts[alert_key]
                    alert.resolved = True
                    alert.resolved_at = current_time
                    
                    del self.active_alerts[alert_key]
                    
                    # 发送告警解决通知
                    asyncio.create_task(self.send_alert_resolved(alert))
                    
                    logger.info(f"告警已解决: {alert.message}")
    
    async def send_alert(self, alert: Alert):
        """发送告警通知"""
        try:
            # 发送邮件通知
            await self.send_email_alert(alert)
            
            # 发送到Slack/Teams
            await self.send_webhook_alert(alert)
            
            # 存储到数据库
            await self.store_alert(alert)
            
            # 发送到Elasticsearch
            await self.index_alert(alert)
            
        except Exception as e:
            logger.error(f"发送告警失败: {e}")
    
    async def send_email_alert(self, alert: Alert):
        """发送邮件告警"""
        try:
            email_config = self.config.get('notifications', {}).get('email', {})
            if not email_config.get('enabled', False):
                return
            
            smtp_server = email_config.get('smtp_server')
            smtp_port = email_config.get('smtp_port', 587)
            username = email_config.get('username')
            password = email_config.get('password')
            recipients = email_config.get('recipients', [])
            
            if not all([smtp_server, username, password, recipients]):
                return
            
            msg = MIMEMultipart()
            msg['From'] = username
            msg['To'] = ', '.join(recipients)
            msg['Subject'] = f"[{alert.severity.value.upper()}] GoAccess监控告警"
            
            body = f"""
告警详情:
- 告警ID: {alert.id}
- 时间: {alert.timestamp}
- 严重程度: {alert.severity.value}
- 指标: {alert.metric.value}
- 消息: {alert.message}
- 当前值: {alert.value}
- 阈值: {alert.threshold}
- 来源: {alert.source}
- 标签: {alert.tags}

请及时处理此告警。
            """
            
            msg.attach(MIMEText(body, 'plain', 'utf-8'))
            
            server = smtplib.SMTP(smtp_server, smtp_port)
            server.starttls()
            server.login(username, password)
            server.send_message(msg)
            server.quit()
            
            logger.info(f"邮件告警已发送: {alert.id}")
            
        except Exception as e:
            logger.error(f"发送邮件告警失败: {e}")
    
    async def send_webhook_alert(self, alert: Alert):
        """发送Webhook告警"""
        try:
            webhook_config = self.config.get('notifications', {}).get('webhook', {})
            if not webhook_config.get('enabled', False):
                return
            
            webhook_url = webhook_config.get('url')
            if not webhook_url:
                return
            
            payload = {
                'alert_id': alert.id,
                'timestamp': alert.timestamp.isoformat(),
                'severity': alert.severity.value,
                'metric': alert.metric.value,
                'message': alert.message,
                'value': alert.value,
                'threshold': alert.threshold,
                'source': alert.source,
                'tags': alert.tags
            }
            
            async with aiohttp.ClientSession() as session:
                async with session.post(webhook_url, json=payload) as response:
                    if response.status == 200:
                        logger.info(f"Webhook告警已发送: {alert.id}")
                    else:
                        logger.error(f"Webhook告警发送失败: {response.status}")
                        
        except Exception as e:
            logger.error(f"发送Webhook告警失败: {e}")
    
    async def store_alert(self, alert: Alert):
        """存储告警到数据库"""
        try:
            if not self.db_pool:
                return
            
            async with self.db_pool.acquire() as conn:
                await conn.execute("""
                    INSERT INTO alerts (
                        id, timestamp, severity, metric, message, value, 
                        threshold, source, tags, resolved, resolved_at
                    ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
                """, 
                    alert.id, alert.timestamp, alert.severity.value, 
                    alert.metric.value, alert.message, alert.value,
                    alert.threshold, alert.source, json.dumps(alert.tags),
                    alert.resolved, alert.resolved_at
                )
                
            logger.debug(f"告警已存储到数据库: {alert.id}")
            
        except Exception as e:
            logger.error(f"存储告警到数据库失败: {e}")
    
    async def index_alert(self, alert: Alert):
        """索引告警到Elasticsearch"""
        try:
            if not self.es_client:
                return
            
            doc = {
                'alert_id': alert.id,
                'timestamp': alert.timestamp,
                'severity': alert.severity.value,
                'metric': alert.metric.value,
                'message': alert.message,
                'value': alert.value,
                'threshold': alert.threshold,
                'source': alert.source,
                'tags': alert.tags,
                'resolved': alert.resolved,
                'resolved_at': alert.resolved_at
            }
            
            await self.es_client.index(
                index=f"goaccess-alerts-{datetime.now().strftime('%Y.%m')}",
                body=doc
            )
            
            logger.debug(f"告警已索引到Elasticsearch: {alert.id}")
            
        except Exception as e:
            logger.error(f"索引告警到Elasticsearch失败: {e}")
    
    async def send_alert_resolved(self, alert: Alert):
        """发送告警解决通知"""
        try:
            # 更新数据库
            if self.db_pool:
                async with self.db_pool.acquire() as conn:
                    await conn.execute("""
                        UPDATE alerts 
                        SET resolved = $1, resolved_at = $2 
                        WHERE id = $3
                    """, True, alert.resolved_at, alert.id)
            
            # 更新Elasticsearch
            if self.es_client:
                await self.es_client.update(
                    index=f"goaccess-alerts-{alert.timestamp.strftime('%Y.%m')}",
                    id=alert.id,
                    body={
                        'doc': {
                            'resolved': True,
                            'resolved_at': alert.resolved_at
                        }
                    }
                )
            
            logger.info(f"告警解决通知已发送: {alert.id}")
            
        except Exception as e:
            logger.error(f"发送告警解决通知失败: {e}")
    
    async def generate_monitoring_report(self) -> Dict:
        """生成监控报告"""
        try:
            current_time = datetime.now()
            report_period = timedelta(hours=24)
            start_time = current_time - report_period
            
            # 收集当前指标
            system_metrics = await self.collect_system_metrics()
            goaccess_metrics = await self.collect_goaccess_metrics()
            docker_metrics = await self.collect_docker_metrics()
            k8s_metrics = await self.collect_kubernetes_metrics()
            
            # 统计告警信息
            recent_alerts = [alert for alert in self.alert_history 
                           if alert.timestamp >= start_time]
            
            alert_stats = {
                'total_alerts': len(recent_alerts),
                'by_severity': {},
                'by_metric': {},
                'resolved_alerts': len([a for a in recent_alerts if a.resolved]),
                'active_alerts': len(self.active_alerts)
            }
            
            for alert in recent_alerts:
                severity = alert.severity.value
                metric = alert.metric.value
                
                alert_stats['by_severity'][severity] = alert_stats['by_severity'].get(severity, 0) + 1
                alert_stats['by_metric'][metric] = alert_stats['by_metric'].get(metric, 0) + 1
            
            report = {
                'report_timestamp': current_time.isoformat(),
                'report_period': {
                    'start': start_time.isoformat(),
                    'end': current_time.isoformat(),
                    'duration_hours': 24
                },
                'system_metrics': system_metrics,
                'goaccess_metrics': goaccess_metrics,
                'docker_metrics': docker_metrics,
                'kubernetes_metrics': k8s_metrics,
                'alert_statistics': alert_stats,
                'active_alerts': [asdict(alert) for alert in self.active_alerts.values()],
                'monitoring_rules': {
                    'total_rules': len(self.monitoring_rules),
                    'enabled_rules': len([r for r in self.monitoring_rules if r.enabled]),
                    'disabled_rules': len([r for r in self.monitoring_rules if not r.enabled])
                },
                'health_status': self.get_overall_health_status()
            }
            
            return report
            
        except Exception as e:
            logger.error(f"生成监控报告失败: {e}")
            return {}
    
    def get_overall_health_status(self) -> str:
        """获取整体健康状态"""
        if any(alert.severity == AlertSeverity.CRITICAL for alert in self.active_alerts.values()):
            return "critical"
        elif any(alert.severity == AlertSeverity.ERROR for alert in self.active_alerts.values()):
            return "error"
        elif any(alert.severity == AlertSeverity.WARNING for alert in self.active_alerts.values()):
            return "warning"
        else:
            return "healthy"
    
    async def run_monitoring_loop(self):
        """运行监控循环"""
        logger.info("启动企业级监控循环")
        
        while True:
            try:
                # 收集所有指标
                system_metrics = await self.collect_system_metrics()
                goaccess_metrics = await self.collect_goaccess_metrics()
                
                # 合并指标
                all_metrics = {**system_metrics, **goaccess_metrics}
                
                # 评估监控规则
                self.evaluate_rules(all_metrics)
                
                # 等待下一次检查
                await asyncio.sleep(self.config.get('monitoring_interval', 60))
                
            except Exception as e:
                logger.error(f"监控循环错误: {e}")
                await asyncio.sleep(10)
    
    async def cleanup(self):
        """清理资源"""
        try:
            if self.redis_client:
                await self.redis_client.close()
            
            if self.es_client:
                await self.es_client.close()
            
            if self.db_pool:
                await self.db_pool.close()
            
            if self.docker_client:
                self.docker_client.close()
            
            logger.info("监控系统资源清理完成")
            
        except Exception as e:
            logger.error(f"清理监控系统资源失败: {e}")

# 使用示例
async def main():
    monitoring = EnterpriseMonitoring('monitoring_config.yaml')
    
    try:
        await monitoring.initialize()
        
        # 启动监控循环
        await monitoring.run_monitoring_loop()
        
    except KeyboardInterrupt:
        logger.info("收到停止信号")
    finally:
        await monitoring.cleanup()

if __name__ == '__main__':
    asyncio.run(main())

6. 总结

本章详细介绍了GoAccess的高级应用场景与企业级部署方案,包括:

6.1 主要内容回顾

  1. 企业级架构设计

    • 分布式部署架构
    • 高可用性配置
    • 负载均衡策略
  2. 大规模日志处理

    • 分布式日志收集
    • 实时流处理
    • 数据管道设计
  3. 容器化与云原生

    • Docker容器化部署
    • Kubernetes集群管理
    • 云服务集成
  4. 安全与合规

    • 数据安全管理
    • 合规性要求
    • 审计日志记录
  5. 企业级监控

    • 全方位监控体系
    • 智能告警机制
    • 性能优化建议

6.2 最佳实践总结

  1. 架构设计原则

    • 高可用性优先
    • 水平扩展能力
    • 故障隔离机制
  2. 性能优化策略

    • 合理的资源配置
    • 缓存策略优化
    • 数据分片处理
  3. 安全防护措施

    • 数据加密传输
    • 访问权限控制
    • 安全审计跟踪
  4. 运维管理规范

    • 自动化部署流程
    • 监控告警体系
    • 应急响应机制

6.3 未来发展方向

  1. 技术演进趋势

    • 云原生架构深化
    • AI/ML智能分析
    • 边缘计算支持
  2. 功能扩展计划

    • 更多数据源支持
    • 高级分析算法
    • 可视化增强
  3. 生态系统建设

    • 插件机制完善
    • 第三方集成
    • 社区贡献支持

通过本章的学习,您应该能够: - 设计和实施企业级GoAccess部署方案 - 构建高可用、高性能的日志分析系统 - 实现完整的监控和告警体系 - 满足企业级安全和合规要求

这标志着我们GoAccess教程系列的完成。从基础安装到企业级部署,您已经掌握了GoAccess的全方位应用技能。希望这些知识能够帮助您在实际项目中构建出色的日志分析解决方案。

1.2 高可用性配置

1.2.1 故障转移机制

#!/usr/bin/env python3
# failover_manager.py - GoAccess故障转移管理器

import time
import json
import logging
import requests
import subprocess
from typing import Dict, List, Optional
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class GoAccessNode:
    """GoAccess节点信息"""
    name: str
    host: str
    port: int
    role: str  # primary, secondary, backup
    health_endpoint: str
    priority: int
    last_health_check: Optional[datetime] = None
    is_healthy: bool = True
    consecutive_failures: int = 0

class FailoverManager:
    """GoAccess故障转移管理器"""
    
    def __init__(self, config_file: str):
        self.config = self.load_config(config_file)
        self.nodes = self.initialize_nodes()
        self.current_primary = self.get_primary_node()
        self.health_check_interval = self.config.get('health_check_interval', 30)
        self.failure_threshold = self.config.get('failure_threshold', 3)
        self.recovery_threshold = self.config.get('recovery_threshold', 2)
        self.running = False
    
    def load_config(self, config_file: str) -> Dict:
        """加载配置文件"""
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        except Exception as e:
            logger.error(f"加载配置文件失败: {e}")
            raise
    
    def initialize_nodes(self) -> List[GoAccessNode]:
        """初始化节点列表"""
        nodes = []
        for node_config in self.config.get('nodes', []):
            node = GoAccessNode(
                name=node_config['name'],
                host=node_config['host'],
                port=node_config['port'],
                role=node_config['role'],
                health_endpoint=node_config.get('health_endpoint', '/health'),
                priority=node_config.get('priority', 0)
            )
            nodes.append(node)
        
        # 按优先级排序
        nodes.sort(key=lambda x: x.priority, reverse=True)
        return nodes
    
    def get_primary_node(self) -> Optional[GoAccessNode]:
        """获取当前主节点"""
        for node in self.nodes:
            if node.role == 'primary' and node.is_healthy:
                return node
        return None
    
    def check_node_health(self, node: GoAccessNode) -> bool:
        """检查节点健康状态"""
        try:
            url = f"http://{node.host}:{node.port}{node.health_endpoint}"
            response = requests.get(url, timeout=5)
            
            if response.status_code == 200:
                # 检查响应内容
                try:
                    health_data = response.json()
                    if health_data.get('status') == 'healthy':
                        node.consecutive_failures = 0
                        node.last_health_check = datetime.now()
                        return True
                except:
                    pass
            
            node.consecutive_failures += 1
            logger.warning(f"节点 {node.name} 健康检查失败 (连续失败: {node.consecutive_failures})")
            return False
            
        except Exception as e:
            node.consecutive_failures += 1
            logger.error(f"节点 {node.name} 健康检查异常: {e} (连续失败: {node.consecutive_failures})")
            return False
    
    def update_node_status(self, node: GoAccessNode, is_healthy: bool):
        """更新节点状态"""
        if is_healthy and not node.is_healthy:
            # 节点恢复
            if node.consecutive_failures == 0:
                node.is_healthy = True
                logger.info(f"节点 {node.name} 已恢复健康")
                self.on_node_recovery(node)
        
        elif not is_healthy and node.is_healthy:
            # 节点故障
            if node.consecutive_failures >= self.failure_threshold:
                node.is_healthy = False
                logger.error(f"节点 {node.name} 被标记为不健康")
                self.on_node_failure(node)
    
    def on_node_failure(self, failed_node: GoAccessNode):
        """处理节点故障"""
        logger.info(f"处理节点故障: {failed_node.name}")
        
        # 如果故障的是主节点,需要进行故障转移
        if failed_node.role == 'primary' or failed_node == self.current_primary:
            self.perform_failover(failed_node)
        
        # 发送告警
        self.send_alert({
            'type': 'node_failure',
            'node': failed_node.name,
            'timestamp': datetime.now().isoformat(),
            'consecutive_failures': failed_node.consecutive_failures
        })
    
    def on_node_recovery(self, recovered_node: GoAccessNode):
        """处理节点恢复"""
        logger.info(f"处理节点恢复: {recovered_node.name}")
        
        # 如果恢复的是优先级更高的节点,考虑切换回去
        if (recovered_node.priority > self.current_primary.priority and 
            recovered_node.role in ['primary', 'secondary']):
            self.consider_failback(recovered_node)
        
        # 发送恢复通知
        self.send_alert({
            'type': 'node_recovery',
            'node': recovered_node.name,
            'timestamp': datetime.now().isoformat()
        })
    
    def perform_failover(self, failed_node: GoAccessNode):
        """执行故障转移"""
        logger.info(f"开始故障转移,故障节点: {failed_node.name}")
        
        # 找到最佳的备用节点
        backup_node = self.find_best_backup_node()
        
        if not backup_node:
            logger.error("没有可用的备用节点进行故障转移")
            self.send_alert({
                'type': 'failover_failed',
                'reason': 'no_backup_available',
                'timestamp': datetime.now().isoformat()
            })
            return
        
        # 执行故障转移步骤
        try:
            # 1. 停止故障节点的流量
            self.stop_traffic_to_node(failed_node)
            
            # 2. 启动备用节点
            self.promote_backup_node(backup_node)
            
            # 3. 更新负载均衡器配置
            self.update_load_balancer(failed_node, backup_node)
            
            # 4. 更新当前主节点
            self.current_primary = backup_node
            
            logger.info(f"故障转移完成: {failed_node.name} -> {backup_node.name}")
            
            self.send_alert({
                'type': 'failover_success',
                'failed_node': failed_node.name,
                'new_primary': backup_node.name,
                'timestamp': datetime.now().isoformat()
            })
            
        except Exception as e:
            logger.error(f"故障转移失败: {e}")
            self.send_alert({
                'type': 'failover_failed',
                'error': str(e),
                'timestamp': datetime.now().isoformat()
            })
    
    def find_best_backup_node(self) -> Optional[GoAccessNode]:
        """找到最佳的备用节点"""
        # 按优先级排序,选择健康的节点
        for node in sorted(self.nodes, key=lambda x: x.priority, reverse=True):
            if (node.is_healthy and 
                node.role in ['secondary', 'backup'] and 
                node != self.current_primary):
                return node
        return None
    
    def stop_traffic_to_node(self, node: GoAccessNode):
        """停止到指定节点的流量"""
        logger.info(f"停止到节点 {node.name} 的流量")
        
        # 这里可以调用负载均衡器的API来移除节点
        # 例如:Nginx upstream, HAProxy, 或云负载均衡器
        
        try:
            # 示例:调用Nginx Plus API
            if self.config.get('load_balancer', {}).get('type') == 'nginx_plus':
                self.remove_nginx_upstream_server(node)
            
            # 示例:调用HAProxy API
            elif self.config.get('load_balancer', {}).get('type') == 'haproxy':
                self.disable_haproxy_server(node)
            
            # 示例:更新云负载均衡器
            elif self.config.get('load_balancer', {}).get('type') == 'cloud':
                self.remove_cloud_lb_target(node)
                
        except Exception as e:
            logger.error(f"停止流量失败: {e}")
            raise
    
    def promote_backup_node(self, backup_node: GoAccessNode):
        """提升备用节点为主节点"""
        logger.info(f"提升节点 {backup_node.name} 为主节点")
        
        try:
            # 1. 更新节点角色
            backup_node.role = 'primary'
            
            # 2. 启动GoAccess服务(如果未运行)
            self.ensure_goaccess_running(backup_node)
            
            # 3. 同步数据(如果需要)
            self.sync_data_to_node(backup_node)
            
            # 4. 验证节点就绪
            if not self.verify_node_ready(backup_node):
                raise Exception(f"节点 {backup_node.name} 未能正确启动")
                
        except Exception as e:
            logger.error(f"提升备用节点失败: {e}")
            raise
    
    def update_load_balancer(self, old_node: GoAccessNode, new_node: GoAccessNode):
        """更新负载均衡器配置"""
        logger.info(f"更新负载均衡器: {old_node.name} -> {new_node.name}")
        
        try:
            # 添加新节点到负载均衡器
            if self.config.get('load_balancer', {}).get('type') == 'nginx_plus':
                self.add_nginx_upstream_server(new_node)
            elif self.config.get('load_balancer', {}).get('type') == 'haproxy':
                self.enable_haproxy_server(new_node)
            elif self.config.get('load_balancer', {}).get('type') == 'cloud':
                self.add_cloud_lb_target(new_node)
                
        except Exception as e:
            logger.error(f"更新负载均衡器失败: {e}")
            raise
    
    def consider_failback(self, recovered_node: GoAccessNode):
        """考虑故障回切"""
        # 检查是否启用自动回切
        if not self.config.get('auto_failback', False):
            logger.info(f"自动回切已禁用,节点 {recovered_node.name} 恢复但不会自动切换")
            return
        
        # 检查恢复节点的稳定性
        if self.is_node_stable(recovered_node):
            logger.info(f"执行故障回切到节点 {recovered_node.name}")
            self.perform_failback(recovered_node)
    
    def is_node_stable(self, node: GoAccessNode, stability_period: int = 300) -> bool:
        """检查节点是否稳定"""
        if not node.last_health_check:
            return False
        
        stable_time = datetime.now() - timedelta(seconds=stability_period)
        return (node.last_health_check >= stable_time and 
                node.consecutive_failures == 0)
    
    def perform_failback(self, target_node: GoAccessNode):
        """执行故障回切"""
        logger.info(f"开始故障回切到节点 {target_node.name}")
        
        try:
            # 执行与故障转移相似的步骤
            old_primary = self.current_primary
            
            # 1. 准备目标节点
            self.promote_backup_node(target_node)
            
            # 2. 更新负载均衡器
            self.update_load_balancer(old_primary, target_node)
            
            # 3. 降级旧主节点
            old_primary.role = 'secondary'
            
            # 4. 更新当前主节点
            self.current_primary = target_node
            
            logger.info(f"故障回切完成: {old_primary.name} -> {target_node.name}")
            
            self.send_alert({
                'type': 'failback_success',
                'old_primary': old_primary.name,
                'new_primary': target_node.name,
                'timestamp': datetime.now().isoformat()
            })
            
        except Exception as e:
            logger.error(f"故障回切失败: {e}")
            self.send_alert({
                'type': 'failback_failed',
                'error': str(e),
                'timestamp': datetime.now().isoformat()
            })
    
    def ensure_goaccess_running(self, node: GoAccessNode):
        """确保GoAccess服务在节点上运行"""
        # 这里可以通过SSH或容器API启动GoAccess
        pass
    
    def sync_data_to_node(self, node: GoAccessNode):
        """同步数据到节点"""
        # 这里可以同步GoAccess数据库或配置
        pass
    
    def verify_node_ready(self, node: GoAccessNode) -> bool:
        """验证节点是否就绪"""
        return self.check_node_health(node)
    
    def send_alert(self, alert_data: Dict):
        """发送告警"""
        logger.info(f"发送告警: {alert_data}")
        
        # 这里可以集成各种告警渠道
        # 例如:邮件、Slack、钉钉、短信等
        
        alert_channels = self.config.get('alert_channels', [])
        
        for channel in alert_channels:
            try:
                if channel['type'] == 'webhook':
                    requests.post(channel['url'], json=alert_data, timeout=10)
                elif channel['type'] == 'email':
                    self.send_email_alert(channel, alert_data)
                elif channel['type'] == 'slack':
                    self.send_slack_alert(channel, alert_data)
            except Exception as e:
                logger.error(f"发送告警失败 ({channel['type']}): {e}")
    
    def send_email_alert(self, channel: Dict, alert_data: Dict):
        """发送邮件告警"""
        # 实现邮件发送逻辑
        pass
    
    def send_slack_alert(self, channel: Dict, alert_data: Dict):
        """发送Slack告警"""
        # 实现Slack消息发送逻辑
        pass
    
    def remove_nginx_upstream_server(self, node: GoAccessNode):
        """从Nginx upstream中移除服务器"""
        # 实现Nginx Plus API调用
        pass
    
    def add_nginx_upstream_server(self, node: GoAccessNode):
        """添加服务器到Nginx upstream"""
        # 实现Nginx Plus API调用
        pass
    
    def disable_haproxy_server(self, node: GoAccessNode):
        """禁用HAProxy服务器"""
        # 实现HAProxy API调用
        pass
    
    def enable_haproxy_server(self, node: GoAccessNode):
        """启用HAProxy服务器"""
        # 实现HAProxy API调用
        pass
    
    def remove_cloud_lb_target(self, node: GoAccessNode):
        """从云负载均衡器中移除目标"""
        # 实现云服务API调用
        pass
    
    def add_cloud_lb_target(self, node: GoAccessNode):
        """添加目标到云负载均衡器"""
        # 实现云服务API调用
        pass
    
    def run_health_checks(self):
        """运行健康检查循环"""
        self.running = True
        logger.info("开始健康检查循环")
        
        while self.running:
            try:
                # 并发检查所有节点
                with ThreadPoolExecutor(max_workers=len(self.nodes)) as executor:
                    futures = {}
                    
                    for node in self.nodes:
                        future = executor.submit(self.check_node_health, node)
                        futures[future] = node
                    
                    # 处理检查结果
                    for future in futures:
                        node = futures[future]
                        try:
                            is_healthy = future.result(timeout=10)
                            self.update_node_status(node, is_healthy)
                        except Exception as e:
                            logger.error(f"健康检查异常 ({node.name}): {e}")
                            self.update_node_status(node, False)
                
                # 等待下次检查
                time.sleep(self.health_check_interval)
                
            except KeyboardInterrupt:
                logger.info("收到中断信号,停止健康检查")
                break
            except Exception as e:
                logger.error(f"健康检查循环异常: {e}")
                time.sleep(5)
        
        self.running = False
        logger.info("健康检查循环已停止")
    
    def stop(self):
        """停止故障转移管理器"""
        self.running = False
    
    def get_cluster_status(self) -> Dict:
        """获取集群状态"""
        return {
            'current_primary': self.current_primary.name if self.current_primary else None,
            'nodes': [
                {
                    'name': node.name,
                    'host': node.host,
                    'port': node.port,
                    'role': node.role,
                    'is_healthy': node.is_healthy,
                    'consecutive_failures': node.consecutive_failures,
                    'last_health_check': node.last_health_check.isoformat() if node.last_health_check else None
                }
                for node in self.nodes
            ],
            'timestamp': datetime.now().isoformat()
        }

def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='GoAccess故障转移管理器')
    parser.add_argument('--config', '-c', required=True, help='配置文件路径')
    parser.add_argument('--status', action='store_true', help='显示集群状态')
    parser.add_argument('--daemon', action='store_true', help='以守护进程模式运行')
    
    args = parser.parse_args()
    
    # 创建故障转移管理器
    manager = FailoverManager(args.config)
    
    if args.status:
        # 显示集群状态
        status = manager.get_cluster_status()
        print(json.dumps(status, indent=2, ensure_ascii=False))
        return
    
    try:
        if args.daemon:
            # 守护进程模式
            manager.run_health_checks()
        else:
            # 交互模式
            print("GoAccess故障转移管理器已启动")
            print("按 Ctrl+C 停止")
            manager.run_health_checks()
    
    except KeyboardInterrupt:
        print("\n正在停止故障转移管理器...")
        manager.stop()
    except Exception as e:
        logger.error(f"故障转移管理器异常: {e}")
        raise

if __name__ == "__main__":
    main()

2. 大规模日志处理

2.1 日志收集架构

2.1.1 分布式日志收集

#!/usr/bin/env python3
# distributed_log_collector.py - 分布式日志收集器

import os
import time
import json
import gzip
import hashlib
import logging
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from datetime import datetime, timedelta
from pathlib import Path
import asyncio
import aiofiles
import aiohttp
from kafka import KafkaProducer, KafkaConsumer
from redis import Redis
from elasticsearch import Elasticsearch

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class LogEntry:
    """日志条目"""
    timestamp: str
    source: str
    level: str
    message: str
    metadata: Dict
    raw_line: str
    hash_id: str

@dataclass
class LogSource:
    """日志源配置"""
    name: str
    path: str
    format: str
    encoding: str = 'utf-8'
    rotation_pattern: Optional[str] = None
    compression: bool = False
    tags: List[str] = None
    filters: List[str] = None

class DistributedLogCollector:
    """分布式日志收集器"""
    
    def __init__(self, config_file: str):
        self.config = self.load_config(config_file)
        self.log_sources = self.initialize_log_sources()
        self.kafka_producer = self.initialize_kafka_producer()
        self.redis_client = self.initialize_redis_client()
        self.es_client = self.initialize_elasticsearch_client()
        self.processing_stats = {
            'total_processed': 0,
            'total_errors': 0,
            'processing_rate': 0,
            'last_update': datetime.now()
        }
        self.running = False
    
    def load_config(self, config_file: str) -> Dict:
        """加载配置文件"""
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        except Exception as e:
            logger.error(f"加载配置文件失败: {e}")
            raise
    
    def initialize_log_sources(self) -> List[LogSource]:
        """初始化日志源"""
        sources = []
        for source_config in self.config.get('log_sources', []):
            source = LogSource(
                name=source_config['name'],
                path=source_config['path'],
                format=source_config['format'],
                encoding=source_config.get('encoding', 'utf-8'),
                rotation_pattern=source_config.get('rotation_pattern'),
                compression=source_config.get('compression', False),
                tags=source_config.get('tags', []),
                filters=source_config.get('filters', [])
            )
            sources.append(source)
        return sources
    
    def initialize_kafka_producer(self) -> Optional[KafkaProducer]:
        """初始化Kafka生产者"""
        kafka_config = self.config.get('kafka')
        if not kafka_config or not kafka_config.get('enabled', False):
            return None
        
        try:
            producer = KafkaProducer(
                bootstrap_servers=kafka_config['bootstrap_servers'],
                value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'),
                key_serializer=lambda k: k.encode('utf-8') if k else None,
                acks=kafka_config.get('acks', 'all'),
                retries=kafka_config.get('retries', 3),
                batch_size=kafka_config.get('batch_size', 16384),
                linger_ms=kafka_config.get('linger_ms', 10),
                compression_type=kafka_config.get('compression_type', 'gzip')
            )
            logger.info("Kafka生产者初始化成功")
            return producer
        except Exception as e:
            logger.error(f"Kafka生产者初始化失败: {e}")
            return None
    
    def initialize_redis_client(self) -> Optional[Redis]:
        """初始化Redis客户端"""
        redis_config = self.config.get('redis')
        if not redis_config or not redis_config.get('enabled', False):
            return None
        
        try:
            client = Redis(
                host=redis_config['host'],
                port=redis_config['port'],
                db=redis_config.get('db', 0),
                password=redis_config.get('password'),
                decode_responses=True,
                socket_timeout=redis_config.get('timeout', 5)
            )
            # 测试连接
            client.ping()
            logger.info("Redis客户端初始化成功")
            return client
        except Exception as e:
            logger.error(f"Redis客户端初始化失败: {e}")
            return None
    
    def initialize_elasticsearch_client(self) -> Optional[Elasticsearch]:
        """初始化Elasticsearch客户端"""
        es_config = self.config.get('elasticsearch')
        if not es_config or not es_config.get('enabled', False):
            return None
        
        try:
            client = Elasticsearch(
                hosts=es_config['hosts'],
                timeout=es_config.get('timeout', 30),
                max_retries=es_config.get('max_retries', 3),
                retry_on_timeout=True
            )
            # 测试连接
            if client.ping():
                logger.info("Elasticsearch客户端初始化成功")
                return client
            else:
                logger.error("Elasticsearch连接测试失败")
                return None
        except Exception as e:
            logger.error(f"Elasticsearch客户端初始化失败: {e}")
            return None
    
    def parse_log_line(self, line: str, source: LogSource) -> Optional[LogEntry]:
        """解析日志行"""
        try:
            # 生成唯一ID
            hash_id = hashlib.md5(f"{source.name}:{line}".encode()).hexdigest()
            
            # 根据格式解析日志
            if source.format == 'nginx_combined':
                return self.parse_nginx_combined(line, source, hash_id)
            elif source.format == 'apache_combined':
                return self.parse_apache_combined(line, source, hash_id)
            elif source.format == 'json':
                return self.parse_json_log(line, source, hash_id)
            elif source.format == 'syslog':
                return self.parse_syslog(line, source, hash_id)
            else:
                # 通用解析
                return self.parse_generic_log(line, source, hash_id)
        
        except Exception as e:
            logger.error(f"解析日志行失败 ({source.name}): {e}")
            return None
    
    def parse_nginx_combined(self, line: str, source: LogSource, hash_id: str) -> LogEntry:
        """解析Nginx Combined格式日志"""
        import re
        
        # Nginx Combined日志格式正则表达式
        pattern = r'(\S+) \S+ \S+ \[([^\]]+)\] "(\S+) (\S+) (\S+)" (\d+) (\d+) "([^"]*)" "([^"]*)"'
        match = re.match(pattern, line)
        
        if match:
            ip, timestamp, method, path, protocol, status, size, referer, user_agent = match.groups()
            
            return LogEntry(
                timestamp=timestamp,
                source=source.name,
                level='INFO',
                message=f"{method} {path} {status}",
                metadata={
                    'ip': ip,
                    'method': method,
                    'path': path,
                    'protocol': protocol,
                    'status_code': int(status),
                    'response_size': int(size) if size != '-' else 0,
                    'referer': referer if referer != '-' else None,
                    'user_agent': user_agent,
                    'tags': source.tags
                },
                raw_line=line,
                hash_id=hash_id
            )
        else:
            raise ValueError("无法解析Nginx Combined格式")
    
    def parse_apache_combined(self, line: str, source: LogSource, hash_id: str) -> LogEntry:
        """解析Apache Combined格式日志"""
        # 与Nginx格式类似
        return self.parse_nginx_combined(line, source, hash_id)
    
    def parse_json_log(self, line: str, source: LogSource, hash_id: str) -> LogEntry:
        """解析JSON格式日志"""
        try:
            data = json.loads(line)
            
            return LogEntry(
                timestamp=data.get('timestamp', datetime.now().isoformat()),
                source=source.name,
                level=data.get('level', 'INFO'),
                message=data.get('message', ''),
                metadata={
                    **data,
                    'tags': source.tags
                },
                raw_line=line,
                hash_id=hash_id
            )
        except json.JSONDecodeError:
            raise ValueError("无法解析JSON格式")
    
    def parse_syslog(self, line: str, source: LogSource, hash_id: str) -> LogEntry:
        """解析Syslog格式日志"""
        import re
        
        # Syslog格式正则表达式
        pattern = r'(\w{3} \d{2} \d{2}:\d{2}:\d{2}) (\S+) (\S+): (.*)'
        match = re.match(pattern, line)
        
        if match:
            timestamp, hostname, program, message = match.groups()
            
            return LogEntry(
                timestamp=timestamp,
                source=source.name,
                level='INFO',
                message=message,
                metadata={
                    'hostname': hostname,
                    'program': program,
                    'tags': source.tags
                },
                raw_line=line,
                hash_id=hash_id
            )
        else:
            raise ValueError("无法解析Syslog格式")
    
    def parse_generic_log(self, line: str, source: LogSource, hash_id: str) -> LogEntry:
        """解析通用格式日志"""
        return LogEntry(
            timestamp=datetime.now().isoformat(),
            source=source.name,
            level='INFO',
            message=line,
            metadata={
                'tags': source.tags
            },
            raw_line=line,
            hash_id=hash_id
        )
    
    def apply_filters(self, entry: LogEntry, source: LogSource) -> bool:
        """应用过滤器"""
        if not source.filters:
            return True
        
        for filter_expr in source.filters:
            try:
                # 简单的过滤器实现
                if filter_expr.startswith('status_code:'):
                    status_codes = filter_expr.split(':')[1].split(',')
                    if str(entry.metadata.get('status_code')) not in status_codes:
                        return False
                elif filter_expr.startswith('exclude_ip:'):
                    excluded_ips = filter_expr.split(':')[1].split(',')
                    if entry.metadata.get('ip') in excluded_ips:
                        return False
                elif filter_expr.startswith('include_path:'):
                    included_paths = filter_expr.split(':')[1].split(',')
                    path = entry.metadata.get('path', '')
                    if not any(included_path in path for included_path in included_paths):
                        return False
            except Exception as e:
                logger.error(f"应用过滤器失败: {e}")
        
        return True
    
    async def process_log_file(self, source: LogSource) -> int:
        """处理日志文件"""
        processed_count = 0
        
        try:
            # 检查文件是否存在
            if not os.path.exists(source.path):
                logger.warning(f"日志文件不存在: {source.path}")
                return 0
            
            # 获取文件状态
            file_stat = os.stat(source.path)
            
            # 检查是否需要处理(基于文件修改时间)
            last_processed = await self.get_last_processed_time(source.name)
            if last_processed and file_stat.st_mtime <= last_processed:
                return 0
            
            # 打开文件
            if source.compression:
                file_opener = gzip.open
            else:
                file_opener = open
            
            async with aiofiles.open(source.path, 'r', encoding=source.encoding) as f:
                # 如果有上次处理的位置,跳转到该位置
                last_position = await self.get_last_processed_position(source.name)
                if last_position:
                    await f.seek(last_position)
                
                async for line in f:
                    line = line.strip()
                    if not line:
                        continue
                    
                    # 解析日志行
                    entry = self.parse_log_line(line, source)
                    if not entry:
                        continue
                    
                    # 应用过滤器
                    if not self.apply_filters(entry, source):
                        continue
                    
                    # 发送到各个目标
                    await self.send_to_targets(entry)
                    
                    processed_count += 1
                    
                    # 定期保存处理位置
                    if processed_count % 1000 == 0:
                        current_position = await f.tell()
                        await self.save_processed_position(source.name, current_position)
                
                # 保存最终处理位置和时间
                final_position = await f.tell()
                await self.save_processed_position(source.name, final_position)
                await self.save_processed_time(source.name, file_stat.st_mtime)
        
        except Exception as e:
            logger.error(f"处理日志文件失败 ({source.path}): {e}")
            self.processing_stats['total_errors'] += 1
        
        return processed_count
    
    async def send_to_targets(self, entry: LogEntry):
        """发送日志条目到各个目标"""
        entry_dict = asdict(entry)
        
        # 发送到Kafka
        if self.kafka_producer:
            try:
                topic = self.config['kafka'].get('topic', 'goaccess-logs')
                key = f"{entry.source}:{entry.hash_id}"
                self.kafka_producer.send(topic, value=entry_dict, key=key)
            except Exception as e:
                logger.error(f"发送到Kafka失败: {e}")
        
        # 发送到Redis
        if self.redis_client:
            try:
                key = f"goaccess:logs:{entry.source}:{entry.hash_id}"
                self.redis_client.setex(key, 3600, json.dumps(entry_dict, ensure_ascii=False))
                
                # 添加到实时队列
                queue_key = f"goaccess:queue:{entry.source}"
                self.redis_client.lpush(queue_key, json.dumps(entry_dict, ensure_ascii=False))
                self.redis_client.ltrim(queue_key, 0, 9999)  # 保持队列大小
            except Exception as e:
                logger.error(f"发送到Redis失败: {e}")
        
        # 发送到Elasticsearch
        if self.es_client:
            try:
                index_name = f"goaccess-logs-{datetime.now().strftime('%Y-%m-%d')}"
                self.es_client.index(
                    index=index_name,
                    id=entry.hash_id,
                    body=entry_dict
                )
            except Exception as e:
                logger.error(f"发送到Elasticsearch失败: {e}")
    
    async def get_last_processed_time(self, source_name: str) -> Optional[float]:
        """获取上次处理时间"""
        if self.redis_client:
            try:
                key = f"goaccess:processed_time:{source_name}"
                value = self.redis_client.get(key)
                return float(value) if value else None
            except:
                pass
        return None
    
    async def save_processed_time(self, source_name: str, timestamp: float):
        """保存处理时间"""
        if self.redis_client:
            try:
                key = f"goaccess:processed_time:{source_name}"
                self.redis_client.set(key, timestamp)
            except Exception as e:
                logger.error(f"保存处理时间失败: {e}")
    
    async def get_last_processed_position(self, source_name: str) -> Optional[int]:
        """获取上次处理位置"""
        if self.redis_client:
            try:
                key = f"goaccess:processed_position:{source_name}"
                value = self.redis_client.get(key)
                return int(value) if value else None
            except:
                pass
        return None
    
    async def save_processed_position(self, source_name: str, position: int):
        """保存处理位置"""
        if self.redis_client:
            try:
                key = f"goaccess:processed_position:{source_name}"
                self.redis_client.set(key, position)
            except Exception as e:
                logger.error(f"保存处理位置失败: {e}")
    
    async def run_collection_cycle(self):
        """运行收集周期"""
        start_time = time.time()
        total_processed = 0
        
        # 并发处理所有日志源
        tasks = []
        for source in self.log_sources:
            task = asyncio.create_task(self.process_log_file(source))
            tasks.append(task)
        
        # 等待所有任务完成
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 统计结果
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                logger.error(f"处理日志源失败 ({self.log_sources[i].name}): {result}")
                self.processing_stats['total_errors'] += 1
            else:
                total_processed += result
        
        # 更新统计信息
        end_time = time.time()
        duration = end_time - start_time
        
        self.processing_stats['total_processed'] += total_processed
        self.processing_stats['processing_rate'] = total_processed / duration if duration > 0 else 0
        self.processing_stats['last_update'] = datetime.now()
        
        logger.info(f"收集周期完成: 处理 {total_processed} 条日志,耗时 {duration:.2f}s")
    
    async def run_continuous_collection(self):
        """运行连续收集"""
        self.running = True
        collection_interval = self.config.get('collection_interval', 60)
        
        logger.info(f"开始连续日志收集,间隔 {collection_interval}s")
        
        while self.running:
            try:
                await self.run_collection_cycle()
                await asyncio.sleep(collection_interval)
            except KeyboardInterrupt:
                logger.info("收到中断信号,停止收集")
                break
            except Exception as e:
                logger.error(f"收集周期异常: {e}")
                await asyncio.sleep(5)
        
        self.running = False
        logger.info("日志收集已停止")
    
    def stop(self):
        """停止收集器"""
        self.running = False
        
        # 关闭连接
        if self.kafka_producer:
            self.kafka_producer.close()
        
        if self.redis_client:
            self.redis_client.close()
        
        if self.es_client:
            self.es_client.close()
    
    def get_stats(self) -> Dict:
        """获取统计信息"""
        return {
            **self.processing_stats,
            'log_sources': len(self.log_sources),
            'kafka_enabled': self.kafka_producer is not None,
            'redis_enabled': self.redis_client is not None,
            'elasticsearch_enabled': self.es_client is not None
        }

async def main():
    import argparse
    
    parser = argparse.ArgumentParser(description='分布式日志收集器')
    parser.add_argument('--config', '-c', required=True, help='配置文件路径')
    parser.add_argument('--once', action='store_true', help='只运行一次收集周期')
    parser.add_argument('--stats', action='store_true', help='显示统计信息')
    
    args = parser.parse_args()
    
    # 创建收集器
    collector = DistributedLogCollector(args.config)
    
    if args.stats:
        # 显示统计信息
        stats = collector.get_stats()
        print(json.dumps(stats, indent=2, ensure_ascii=False, default=str))
        return
    
    try:
        if args.once:
            # 运行一次收集周期
            await collector.run_collection_cycle()
        else:
            # 连续收集
            await collector.run_continuous_collection()
    
    except KeyboardInterrupt:
        print("\n正在停止收集器...")
    finally:
        collector.stop()

if __name__ == "__main__":
    asyncio.run(main())

2.2 实时流处理

2.2.1 Kafka Streams处理

#!/usr/bin/env python3
# kafka_stream_processor.py - Kafka流处理器

import json
import time
import logging
from typing import Dict, List, Optional, Callable
from dataclasses import dataclass
from datetime import datetime, timedelta
from collections import defaultdict
from kafka import KafkaConsumer, KafkaProducer
from kafka.structs import TopicPartition
import threading
import queue

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class StreamRecord:
    """流记录"""
    key: str
    value: Dict
    timestamp: int
    partition: int
    offset: int

@dataclass
class WindowedRecord:
    """窗口记录"""
    window_start: datetime
    window_end: datetime
    key: str
    value: Dict
    count: int

class KafkaStreamProcessor:
    """Kafka流处理器"""
    
    def __init__(self, config_file: str):
        self.config = self.load_config(config_file)
        self.consumer = self.initialize_consumer()
        self.producer = self.initialize_producer()
        self.processors = {}
        self.windows = defaultdict(dict)
        self.running = False
        self.stats = {
            'messages_processed': 0,
            'messages_produced': 0,
            'processing_errors': 0,
            'last_update': datetime.now()
        }
    
    def load_config(self, config_file: str) -> Dict:
        """加载配置文件"""
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        except Exception as e:
            logger.error(f"加载配置文件失败: {e}")
            raise
    
    def initialize_consumer(self) -> KafkaConsumer:
        """初始化Kafka消费者"""
        kafka_config = self.config['kafka']
        
        consumer = KafkaConsumer(
            *kafka_config['input_topics'],
            bootstrap_servers=kafka_config['bootstrap_servers'],
            group_id=kafka_config['consumer_group'],
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            key_deserializer=lambda m: m.decode('utf-8') if m else None,
            auto_offset_reset=kafka_config.get('auto_offset_reset', 'latest'),
            enable_auto_commit=kafka_config.get('enable_auto_commit', True),
            auto_commit_interval_ms=kafka_config.get('auto_commit_interval_ms', 5000),
            max_poll_records=kafka_config.get('max_poll_records', 500),
            session_timeout_ms=kafka_config.get('session_timeout_ms', 30000)
        )
        
        logger.info(f"Kafka消费者初始化成功,订阅主题: {kafka_config['input_topics']}")
        return consumer
    
    def initialize_producer(self) -> KafkaProducer:
        """初始化Kafka生产者"""
        kafka_config = self.config['kafka']
        
        producer = KafkaProducer(
            bootstrap_servers=kafka_config['bootstrap_servers'],
            value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            acks=kafka_config.get('acks', 'all'),
            retries=kafka_config.get('retries', 3),
            batch_size=kafka_config.get('batch_size', 16384),
            linger_ms=kafka_config.get('linger_ms', 10),
            compression_type=kafka_config.get('compression_type', 'gzip')
        )
        
        logger.info("Kafka生产者初始化成功")
        return producer
    
    def register_processor(self, name: str, processor_func: Callable[[StreamRecord], Optional[Dict]]):
        """注册处理器"""
        self.processors[name] = processor_func
        logger.info(f"注册处理器: {name}")
    
    def process_access_log_aggregation(self, record: StreamRecord) -> Optional[Dict]:
        """处理访问日志聚合"""
        try:
            log_data = record.value
            
            # 提取关键字段
            ip = log_data.get('metadata', {}).get('ip')
            path = log_data.get('metadata', {}).get('path')
            status_code = log_data.get('metadata', {}).get('status_code')
            user_agent = log_data.get('metadata', {}).get('user_agent')
            timestamp = datetime.fromisoformat(log_data.get('timestamp', datetime.now().isoformat()))
            
            # 生成聚合数据
            aggregations = []
            
            # IP访问统计
            if ip:
                aggregations.append({
                    'type': 'ip_stats',
                    'key': ip,
                    'timestamp': timestamp.isoformat(),
                    'metrics': {
                        'requests': 1,
                        'status_code': status_code,
                        'path': path
                    }
                })
            
            # 路径访问统计
            if path:
                aggregations.append({
                    'type': 'path_stats',
                    'key': path,
                    'timestamp': timestamp.isoformat(),
                    'metrics': {
                        'requests': 1,
                        'status_code': status_code,
                        'ip': ip
                    }
                })
            
            # 状态码统计
            if status_code:
                aggregations.append({
                    'type': 'status_stats',
                    'key': str(status_code),
                    'timestamp': timestamp.isoformat(),
                    'metrics': {
                        'requests': 1,
                        'path': path,
                        'ip': ip
                    }
                })
            
            # 用户代理统计
            if user_agent:
                # 简化用户代理字符串
                simplified_ua = self.simplify_user_agent(user_agent)
                aggregations.append({
                    'type': 'user_agent_stats',
                    'key': simplified_ua,
                    'timestamp': timestamp.isoformat(),
                    'metrics': {
                        'requests': 1,
                        'ip': ip
                    }
                })
            
            return {
                'aggregations': aggregations,
                'original_record': log_data
            }
            
        except Exception as e:
            logger.error(f"处理访问日志聚合失败: {e}")
            return None
    
    def process_real_time_alerts(self, record: StreamRecord) -> Optional[Dict]:
        """处理实时告警"""
        try:
            log_data = record.value
            metadata = log_data.get('metadata', {})
            
            alerts = []
            
            # 检查HTTP错误
            status_code = metadata.get('status_code')
            if status_code and status_code >= 400:
                severity = 'high' if status_code >= 500 else 'medium'
                alerts.append({
                    'type': 'http_error',
                    'severity': severity,
                    'message': f"HTTP {status_code} error detected",
                    'details': {
                        'ip': metadata.get('ip'),
                        'path': metadata.get('path'),
                        'status_code': status_code,
                        'user_agent': metadata.get('user_agent')
                    },
                    'timestamp': log_data.get('timestamp')
                })
            
            # 检查可疑IP
            ip = metadata.get('ip')
            if ip and self.is_suspicious_ip(ip, record.timestamp):
                alerts.append({
                    'type': 'suspicious_ip',
                    'severity': 'high',
                    'message': f"Suspicious activity from IP {ip}",
                    'details': {
                        'ip': ip,
                        'path': metadata.get('path'),
                        'request_count': self.get_ip_request_count(ip)
                    },
                    'timestamp': log_data.get('timestamp')
                })
            
            # 检查异常路径
            path = metadata.get('path')
            if path and self.is_suspicious_path(path):
                alerts.append({
                    'type': 'suspicious_path',
                    'severity': 'medium',
                    'message': f"Suspicious path access: {path}",
                    'details': {
                        'ip': ip,
                        'path': path,
                        'user_agent': metadata.get('user_agent')
                    },
                    'timestamp': log_data.get('timestamp')
                })
            
            if alerts:
                return {
                    'alerts': alerts,
                    'original_record': log_data
                }
            
            return None
            
        except Exception as e:
            logger.error(f"处理实时告警失败: {e}")
            return None
    
    def process_windowed_aggregation(self, record: StreamRecord) -> Optional[Dict]:
        """处理窗口聚合"""
        try:
            log_data = record.value
            timestamp = datetime.fromtimestamp(record.timestamp / 1000)
            
            # 定义窗口大小(5分钟)
            window_size = timedelta(minutes=5)
            window_start = timestamp.replace(second=0, microsecond=0)
            window_start = window_start.replace(minute=(window_start.minute // 5) * 5)
            window_end = window_start + window_size
            
            window_key = f"{window_start.isoformat()}_{window_end.isoformat()}"
            
            # 初始化窗口数据
            if window_key not in self.windows:
                self.windows[window_key] = {
                    'start_time': window_start,
                    'end_time': window_end,
                    'total_requests': 0,
                    'unique_ips': set(),
                    'status_codes': defaultdict(int),
                    'top_paths': defaultdict(int),
                    'error_count': 0,
                    'total_bytes': 0
                }
            
            window_data = self.windows[window_key]
            metadata = log_data.get('metadata', {})
            
            # 更新窗口统计
            window_data['total_requests'] += 1
            
            if metadata.get('ip'):
                window_data['unique_ips'].add(metadata['ip'])
            
            if metadata.get('status_code'):
                status_code = metadata['status_code']
                window_data['status_codes'][status_code] += 1
                if status_code >= 400:
                    window_data['error_count'] += 1
            
            if metadata.get('path'):
                window_data['top_paths'][metadata['path']] += 1
            
            if metadata.get('response_size'):
                window_data['total_bytes'] += metadata['response_size']
            
            # 检查窗口是否完成
            if datetime.now() > window_end:
                # 生成窗口聚合结果
                result = self.finalize_window(window_key, window_data)
                # 清理窗口数据
                del self.windows[window_key]
                return result
            
            return None
            
        except Exception as e:
            logger.error(f"处理窗口聚合失败: {e}")
            return None
    
    def finalize_window(self, window_key: str, window_data: Dict) -> Dict:
        """完成窗口聚合"""
        # 转换set为list以便JSON序列化
        unique_ips_count = len(window_data['unique_ips'])
        
        # 获取Top路径
        top_paths = sorted(
            window_data['top_paths'].items(),
            key=lambda x: x[1],
            reverse=True
        )[:10]
        
        # 计算错误率
        error_rate = (window_data['error_count'] / window_data['total_requests'] * 100 
                     if window_data['total_requests'] > 0 else 0)
        
        return {
            'type': 'window_aggregation',
            'window_start': window_data['start_time'].isoformat(),
            'window_end': window_data['end_time'].isoformat(),
            'metrics': {
                'total_requests': window_data['total_requests'],
                'unique_ips': unique_ips_count,
                'error_count': window_data['error_count'],
                'error_rate': round(error_rate, 2),
                'total_bytes': window_data['total_bytes'],
                'status_codes': dict(window_data['status_codes']),
                'top_paths': top_paths
            }
        }
    
    def simplify_user_agent(self, user_agent: str) -> str:
        """简化用户代理字符串"""
        if not user_agent:
            return 'Unknown'
    
    async def analyze_data(self, message: PipelineMessage) -> Optional[PipelineMessage]:
        """实时数据分析"""
        try:
            data = message.data.copy()
            analysis_metadata = message.metadata.copy()
            
            # 计算访问模式
            access_pattern = self.calculate_access_pattern(data)
            data['access_pattern'] = access_pattern
            
            # 异常检测
            anomaly_score = self.detect_anomaly(data)
            data['anomaly_score'] = anomaly_score
            
            # 性能指标
            performance_metrics = self.calculate_performance_metrics(data)
            data['performance'] = performance_metrics
            
            # 添加分析时间戳
            analysis_metadata['analyzed_at'] = datetime.now().isoformat()
            
            return PipelineMessage(
                id=message.id,
                timestamp=message.timestamp,
                source=message.source,
                data=data,
                metadata=analysis_metadata,
                stage='real_time_analysis'
            )
            
        except Exception as e:
            logger.error(f"实时数据分析失败: {e}")
            return None
    
    def calculate_access_pattern(self, data: Dict) -> Dict:
        """计算访问模式"""
        try:
            pattern = {
                'is_bot': False,
                'is_suspicious': False,
                'frequency_score': 0.0,
                'path_diversity': 0.0
            }
            
            # 检测机器人
            if 'browser' in data and data['browser'].get('device_type') == 'Bot':
                pattern['is_bot'] = True
            
            # 检测可疑活动
            if 'path' in data:
                path = data['path'].lower()
                suspicious_patterns = ['.env', 'wp-admin', 'phpmyadmin', 'admin.php', 'config.php']
                if any(sp in path for sp in suspicious_patterns):
                    pattern['is_suspicious'] = True
            
            # 计算频率分数(需要历史数据支持)
            pattern['frequency_score'] = 1.0  # 简化实现
            
            return pattern
            
        except Exception as e:
            logger.error(f"计算访问模式失败: {e}")
            return {}
    
    def detect_anomaly(self, data: Dict) -> float:
        """异常检测"""
        try:
            anomaly_score = 0.0
            
            # 检查状态码异常
            if 'status' in data:
                status = int(data['status'])
                if status >= 400:
                    anomaly_score += 0.3
                if status >= 500:
                    anomaly_score += 0.5
            
            # 检查响应大小异常
            if 'size' in data:
                size = int(data.get('size', 0))
                if size > 10 * 1024 * 1024:  # 大于10MB
                    anomaly_score += 0.2
            
            # 检查路径异常
            if 'access_pattern' in data and data['access_pattern'].get('is_suspicious'):
                anomaly_score += 0.4
            
            return min(anomaly_score, 1.0)
            
        except Exception as e:
            logger.error(f"异常检测失败: {e}")
            return 0.0
    
    def calculate_performance_metrics(self, data: Dict) -> Dict:
        """计算性能指标"""
        try:
            metrics = {
                'response_time': 0.0,
                'throughput': 0.0,
                'error_rate': 0.0
            }
            
            # 响应时间(如果有的话)
            if 'response_time' in data:
                metrics['response_time'] = float(data['response_time'])
            
            # 吞吐量计算(简化)
            if 'size' in data:
                size = int(data.get('size', 0))
                metrics['throughput'] = size / 1024.0  # KB/s
            
            # 错误率
            if 'status' in data:
                status = int(data['status'])
                metrics['error_rate'] = 1.0 if status >= 400 else 0.0
            
            return metrics
            
        except Exception as e:
            logger.error(f"计算性能指标失败: {e}")
            return {}
    
    async def detect_alerts(self, message: PipelineMessage) -> Optional[PipelineMessage]:
        """告警检测"""
        try:
            data = message.data.copy()
            alerts = []
            
            # 高异常分数告警
            if data.get('anomaly_score', 0) > 0.7:
                alerts.append({
                    'type': 'high_anomaly',
                    'severity': 'warning',
                    'message': f"检测到高异常分数: {data['anomaly_score']}",
                    'timestamp': datetime.now().isoformat()
                })
            
            # 错误状态码告警
            if 'status' in data and int(data['status']) >= 500:
                alerts.append({
                    'type': 'server_error',
                    'severity': 'critical',
                    'message': f"服务器错误: {data['status']}",
                    'timestamp': datetime.now().isoformat()
                })
            
            # 可疑访问告警
            if data.get('access_pattern', {}).get('is_suspicious'):
                alerts.append({
                    'type': 'suspicious_access',
                    'severity': 'warning',
                    'message': f"检测到可疑访问: {data.get('path', 'unknown')}",
                    'timestamp': datetime.now().isoformat()
                })
            
            # 大文件传输告警
            if 'size' in data and int(data['size']) > 50 * 1024 * 1024:  # 大于50MB
                alerts.append({
                    'type': 'large_transfer',
                    'severity': 'info',
                    'message': f"大文件传输: {data['size']} bytes",
                    'timestamp': datetime.now().isoformat()
                })
            
            if alerts:
                data['alerts'] = alerts
                return PipelineMessage(
                    id=message.id,
                    timestamp=message.timestamp,
                    source=message.source,
                    data=data,
                    metadata={**message.metadata, 'has_alerts': True},
                    stage='alert_detection'
                )
            
            return None  # 没有告警
            
        except Exception as e:
            logger.error(f"告警检测失败: {e}")
            return None
    
    async def consume_messages(self):
        """消费消息"""
        if not self.connections['kafka_consumer']:
            logger.error("Kafka消费者未初始化")
            return
        
        try:
            async for message in self.connections['kafka_consumer']:
                if not self.running:
                    break
                
                try:
                    # 解析消息
                    pipeline_message = PipelineMessage(
                        id=message.key or str(time.time()),
                        timestamp=datetime.now(),
                        source=message.topic,
                        data=message.value,
                        metadata={'kafka_offset': message.offset, 'kafka_partition': message.partition}
                    )
                    
                    # 添加到缓冲区
                    self.message_buffer.append(pipeline_message)
                    
                    # 处理消息管道
                    await self.process_pipeline(pipeline_message)
                    
                    # 更新统计信息
                    self.stats['total_processed'] += 1
                    
                except Exception as e:
                    logger.error(f"处理消息失败: {e}")
                    self.stats['total_errors'] += 1
                    ERROR_COUNT.labels(stage='message_consumption', error_type='processing').inc()
                
        except Exception as e:
            logger.error(f"消息消费异常: {e}")
    
    async def process_pipeline(self, message: PipelineMessage):
        """处理管道"""
        current_message = message
        
        # 按顺序执行管道阶段
        stage_order = ['data_cleaning', 'data_enrichment', 'real_time_analysis', 'alert_detection']
        
        for stage_name in stage_order:
            if stage_name not in self.stages:
                continue
            
            stage = self.stages[stage_name]
            
            # 处理消息
            result_messages = await self.process_message(current_message, stage_name)
            
            if not result_messages:
                break  # 处理失败或无结果
            
            # 发送到输出主题
            await self.send_to_outputs(result_messages, stage.output_topics)
            
            # 更新当前消息为第一个结果消息(用于下一阶段)
            if result_messages:
                current_message = result_messages[0]
    
    async def start_websocket_server(self, host: str = 'localhost', port: int = 8765):
        """启动WebSocket服务器"""
        async def handle_client(websocket, path):
            self.connections['websocket_clients'].add(websocket)
            ACTIVE_CONNECTIONS.labels(type='websocket').set(len(self.connections['websocket_clients']))
            logger.info(f"WebSocket客户端连接: {websocket.remote_address}")
            
            try:
                await websocket.wait_closed()
            finally:
                self.connections['websocket_clients'].discard(websocket)
                ACTIVE_CONNECTIONS.labels(type='websocket').set(len(self.connections['websocket_clients']))
                logger.info(f"WebSocket客户端断开: {websocket.remote_address}")
        
        start_server = websockets.serve(handle_client, host, port)
        logger.info(f"WebSocket服务器启动: ws://{host}:{port}")
        return start_server
    
    async def start_metrics_server(self, port: int = 8000):
        """启动指标服务器"""
        start_http_server(port)
        logger.info(f"Prometheus指标服务器启动: http://localhost:{port}")
    
    async def get_pipeline_stats(self) -> Dict:
        """获取管道统计信息"""
        current_time = datetime.now()
        time_diff = (current_time - self.stats['last_update']).total_seconds()
        
        if time_diff > 0:
            self.stats['processing_rate'] = self.stats['total_processed'] / time_diff
        
        self.stats['last_update'] = current_time
        
        return {
            'total_processed': self.stats['total_processed'],
            'total_errors': self.stats['total_errors'],
            'processing_rate': self.stats['processing_rate'],
            'buffer_size': len(self.message_buffer),
            'active_connections': {
                'kafka': 1 if self.connections['kafka_consumer'] else 0,
                'redis': 1 if self.connections['redis'] else 0,
                'elasticsearch': 1 if self.connections['elasticsearch'] else 0,
                'websocket': len(self.connections['websocket_clients'])
            },
            'stage_stats': dict(self.stats['stage_stats']),
            'last_update': self.stats['last_update'].isoformat()
        }
    
    async def cleanup(self):
        """清理资源"""
        logger.info("开始清理资源")
        
        self.running = False
        
        # 关闭Kafka连接
        if self.connections['kafka_consumer']:
            await self.connections['kafka_consumer'].stop()
        if self.connections['kafka_producer']:
            await self.connections['kafka_producer'].stop()
        
        # 关闭Redis连接
        if self.connections['redis']:
            self.connections['redis'].close()
            await self.connections['redis'].wait_closed()
        
        # 关闭Elasticsearch连接
        if self.connections['elasticsearch']:
            await self.connections['elasticsearch'].close()
        
        # 关闭WebSocket连接
        for client in self.connections['websocket_clients']:
            await client.close()
        
        # 关闭线程池
        self.executor.shutdown(wait=True)
        
        logger.info("资源清理完成")

# 主函数
async def main():
    """主函数"""
    import argparse
    
    parser = argparse.ArgumentParser(description='GoAccess实时数据管道')
    parser.add_argument('--config', required=True, help='配置文件路径')
    parser.add_argument('--websocket-host', default='localhost', help='WebSocket服务器主机')
    parser.add_argument('--websocket-port', type=int, default=8765, help='WebSocket服务器端口')
    parser.add_argument('--metrics-port', type=int, default=8000, help='指标服务器端口')
    
    args = parser.parse_args()
    
    # 创建管道实例
    pipeline = RealTimePipeline(args.config)
    
    try:
        # 初始化连接
        await pipeline.initialize_connections()
        
        # 启动指标服务器
        await pipeline.start_metrics_server(args.metrics_port)
        
        # 启动WebSocket服务器
        websocket_server = await pipeline.start_websocket_server(
            args.websocket_host, 
            args.websocket_port
        )
        
        # 启动管道
        await asyncio.gather(
            pipeline.run_pipeline(),
            websocket_server
        )
        
    except KeyboardInterrupt:
        logger.info("收到中断信号,正在停止...")
    except Exception as e:
        logger.error(f"管道运行失败: {e}")
    finally:
        await pipeline.cleanup()

if __name__ == '__main__':
    asyncio.run(main())

5.3 安全与合规

5.3.1 数据安全

#!/usr/bin/env python3
# security_manager.py - 安全管理器

import hashlib
import hmac
import secrets
import jwt
import bcrypt
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
import base64
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
import ipaddress
import re

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class SecurityPolicy:
    """安全策略"""
    encryption_enabled: bool = True
    data_retention_days: int = 90
    anonymization_enabled: bool = True
    access_control_enabled: bool = True
    audit_logging_enabled: bool = True
    ip_whitelist: List[str] = None
    ip_blacklist: List[str] = None
    rate_limit_per_minute: int = 1000
    require_authentication: bool = True
    password_min_length: int = 8
    session_timeout_minutes: int = 30

class SecurityManager:
    """安全管理器"""
    
    def __init__(self, config_file: str):
        self.config = self.load_config(config_file)
        self.policy = SecurityPolicy(**self.config.get('security_policy', {}))
        self.encryption_key = self.generate_or_load_key()
        self.cipher_suite = Fernet(self.encryption_key)
        self.jwt_secret = self.config.get('jwt_secret', secrets.token_urlsafe(32))
        self.active_sessions = {}
        self.rate_limits = {}
        self.audit_log = []
    
    def load_config(self, config_file: str) -> Dict:
        """加载配置文件"""
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        except Exception as e:
            logger.error(f"加载安全配置失败: {e}")
            return {}
    
    def generate_or_load_key(self) -> bytes:
        """生成或加载加密密钥"""
        key_file = self.config.get('encryption_key_file', 'encryption.key')
        
        try:
            with open(key_file, 'rb') as f:
                return f.read()
        except FileNotFoundError:
            # 生成新密钥
            password = self.config.get('encryption_password', 'default_password').encode()
            salt = secrets.token_bytes(16)
            
            kdf = PBKDF2HMAC(
                algorithm=hashes.SHA256(),
                length=32,
                salt=salt,
                iterations=100000,
                backend=default_backend()
            )
            
            key = base64.urlsafe_b64encode(kdf.derive(password))
            
            # 保存密钥
            with open(key_file, 'wb') as f:
                f.write(key)
            
            logger.info(f"生成新的加密密钥: {key_file}")
            return key
    
    def encrypt_data(self, data: str) -> str:
        """加密数据"""
        if not self.policy.encryption_enabled:
            return data
        
        try:
            encrypted_data = self.cipher_suite.encrypt(data.encode('utf-8'))
            return base64.urlsafe_b64encode(encrypted_data).decode('utf-8')
        except Exception as e:
            logger.error(f"数据加密失败: {e}")
            return data
    
    def decrypt_data(self, encrypted_data: str) -> str:
        """解密数据"""
        if not self.policy.encryption_enabled:
            return encrypted_data
        
        try:
            encrypted_bytes = base64.urlsafe_b64decode(encrypted_data.encode('utf-8'))
            decrypted_data = self.cipher_suite.decrypt(encrypted_bytes)
            return decrypted_data.decode('utf-8')
        except Exception as e:
            logger.error(f"数据解密失败: {e}")
            return encrypted_data
    
    def anonymize_ip(self, ip: str) -> str:
        """IP地址匿名化"""
        if not self.policy.anonymization_enabled:
            return ip
        
        try:
            ip_obj = ipaddress.ip_address(ip)
            
            if ip_obj.version == 4:
                # IPv4: 保留前3个字节,最后一个字节设为0
                parts = ip.split('.')
                return f"{parts[0]}.{parts[1]}.{parts[2]}.0"
            else:
                # IPv6: 保留前64位,后64位设为0
                return str(ipaddress.IPv6Address(int(ip_obj) & 0xFFFFFFFFFFFFFFFF0000000000000000))
        except Exception as e:
            logger.error(f"IP匿名化失败: {e}")
            return "0.0.0.0"
    
    def anonymize_user_agent(self, user_agent: str) -> str:
        """用户代理匿名化"""
        if not self.policy.anonymization_enabled:
            return user_agent
        
        try:
            # 移除版本号和详细信息
            anonymized = re.sub(r'\d+\.\d+\.\d+', 'X.X.X', user_agent)
            anonymized = re.sub(r'\([^)]*\)', '(anonymized)', anonymized)
            return anonymized
        except Exception as e:
            logger.error(f"用户代理匿名化失败: {e}")
            return "anonymized"
    
    def hash_sensitive_data(self, data: str, salt: str = None) -> Tuple[str, str]:
        """哈希敏感数据"""
        if salt is None:
            salt = secrets.token_hex(16)
        
        try:
            hash_obj = hashlib.pbkdf2_hmac('sha256', data.encode('utf-8'), salt.encode('utf-8'), 100000)
            return base64.urlsafe_b64encode(hash_obj).decode('utf-8'), salt
        except Exception as e:
            logger.error(f"数据哈希失败: {e}")
            return data, salt
    
    def verify_hash(self, data: str, hashed_data: str, salt: str) -> bool:
        """验证哈希"""
        try:
            computed_hash, _ = self.hash_sensitive_data(data, salt)
            return hmac.compare_digest(computed_hash, hashed_data)
        except Exception as e:
            logger.error(f"哈希验证失败: {e}")
            return False
    
    def check_ip_access(self, ip: str) -> bool:
        """检查IP访问权限"""
        if not self.policy.access_control_enabled:
            return True
        
        try:
            ip_obj = ipaddress.ip_address(ip)
            
            # 检查黑名单
            if self.policy.ip_blacklist:
                for blocked_ip in self.policy.ip_blacklist:
                    if ip_obj in ipaddress.ip_network(blocked_ip, strict=False):
                        self.log_security_event('ip_blocked', {'ip': ip, 'reason': 'blacklist'})
                        return False
            
            # 检查白名单
            if self.policy.ip_whitelist:
                for allowed_ip in self.policy.ip_whitelist:
                    if ip_obj in ipaddress.ip_network(allowed_ip, strict=False):
                        return True
                self.log_security_event('ip_blocked', {'ip': ip, 'reason': 'not_in_whitelist'})
                return False
            
            return True
            
        except Exception as e:
            logger.error(f"IP访问检查失败: {e}")
            return False
    
    def check_rate_limit(self, identifier: str) -> bool:
        """检查速率限制"""
        current_time = datetime.now()
        minute_key = current_time.strftime('%Y-%m-%d %H:%M')
        rate_key = f"{identifier}:{minute_key}"
        
        if rate_key not in self.rate_limits:
            self.rate_limits[rate_key] = 0
        
        self.rate_limits[rate_key] += 1
        
        # 清理过期的速率限制记录
        self.cleanup_rate_limits(current_time)
        
        if self.rate_limits[rate_key] > self.policy.rate_limit_per_minute:
            self.log_security_event('rate_limit_exceeded', {
                'identifier': identifier,
                'count': self.rate_limits[rate_key],
                'limit': self.policy.rate_limit_per_minute
            })
            return False
        
        return True
    
    def cleanup_rate_limits(self, current_time: datetime):
        """清理过期的速率限制记录"""
        cutoff_time = current_time - timedelta(minutes=2)
        cutoff_key = cutoff_time.strftime('%Y-%m-%d %H:%M')
        
        keys_to_remove = [key for key in self.rate_limits.keys() if key.split(':')[1] < cutoff_key]
        for key in keys_to_remove:
            del self.rate_limits[key]
    
    def create_session(self, user_id: str, user_data: Dict) -> str:
        """创建会话"""
        session_id = secrets.token_urlsafe(32)
        
        payload = {
            'session_id': session_id,
            'user_id': user_id,
            'user_data': user_data,
            'created_at': datetime.now().isoformat(),
            'expires_at': (datetime.now() + timedelta(minutes=self.policy.session_timeout_minutes)).isoformat()
        }
        
        token = jwt.encode(payload, self.jwt_secret, algorithm='HS256')
        
        self.active_sessions[session_id] = {
            'user_id': user_id,
            'created_at': datetime.now(),
            'last_activity': datetime.now(),
            'token': token
        }
        
        self.log_security_event('session_created', {'user_id': user_id, 'session_id': session_id})
        return token
    
    def validate_session(self, token: str) -> Optional[Dict]:
        """验证会话"""
        try:
            payload = jwt.decode(token, self.jwt_secret, algorithms=['HS256'])
            session_id = payload['session_id']
            
            if session_id not in self.active_sessions:
                return None
            
            # 检查会话是否过期
            expires_at = datetime.fromisoformat(payload['expires_at'])
            if datetime.now() > expires_at:
                self.destroy_session(session_id)
                return None
            
            # 更新最后活动时间
            self.active_sessions[session_id]['last_activity'] = datetime.now()
            
            return payload
            
        except jwt.InvalidTokenError as e:
            logger.warning(f"无效的会话令牌: {e}")
            return None
        except Exception as e:
            logger.error(f"会话验证失败: {e}")
            return None
    
    def destroy_session(self, session_id: str):
        """销毁会话"""
        if session_id in self.active_sessions:
            user_id = self.active_sessions[session_id]['user_id']
            del self.active_sessions[session_id]
            self.log_security_event('session_destroyed', {'user_id': user_id, 'session_id': session_id})
    
    def cleanup_expired_sessions(self):
        """清理过期会话"""
        current_time = datetime.now()
        expired_sessions = []
        
        for session_id, session_data in self.active_sessions.items():
            last_activity = session_data['last_activity']
            if current_time - last_activity > timedelta(minutes=self.policy.session_timeout_minutes):
                expired_sessions.append(session_id)
        
        for session_id in expired_sessions:
            self.destroy_session(session_id)
    
    def hash_password(self, password: str) -> str:
        """哈希密码"""
        salt = bcrypt.gensalt()
        hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
        return hashed.decode('utf-8')
    
    def verify_password(self, password: str, hashed_password: str) -> bool:
        """验证密码"""
        try:
            return bcrypt.checkpw(password.encode('utf-8'), hashed_password.encode('utf-8'))
        except Exception as e:
            logger.error(f"密码验证失败: {e}")
            return False
    
    def validate_password_strength(self, password: str) -> Tuple[bool, List[str]]:
        """验证密码强度"""
        errors = []
        
        if len(password) < self.policy.password_min_length:
            errors.append(f"密码长度至少{self.policy.password_min_length}位")
        
        if not re.search(r'[A-Z]', password):
            errors.append("密码必须包含大写字母")
        
        if not re.search(r'[a-z]', password):
            errors.append("密码必须包含小写字母")
        
        if not re.search(r'\d', password):
            errors.append("密码必须包含数字")
        
        if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
            errors.append("密码必须包含特殊字符")
        
        return len(errors) == 0, errors
    
    def log_security_event(self, event_type: str, details: Dict):
        """记录安全事件"""
        if not self.policy.audit_logging_enabled:
            return
        
        event = {
            'timestamp': datetime.now().isoformat(),
            'event_type': event_type,
            'details': details,
            'source': 'security_manager'
        }
        
        self.audit_log.append(event)
        logger.info(f"安全事件: {event_type} - {details}")
        
        # 保持审计日志大小
        if len(self.audit_log) > 10000:
            self.audit_log = self.audit_log[-5000:]
    
    def get_security_report(self) -> Dict:
        """获取安全报告"""
        current_time = datetime.now()
        
        # 统计最近24小时的安全事件
        recent_events = []
        cutoff_time = current_time - timedelta(hours=24)
        
        for event in self.audit_log:
            event_time = datetime.fromisoformat(event['timestamp'])
            if event_time > cutoff_time:
                recent_events.append(event)
        
        # 按事件类型分组
        event_counts = {}
        for event in recent_events:
            event_type = event['event_type']
            event_counts[event_type] = event_counts.get(event_type, 0) + 1
        
        return {
            'report_time': current_time.isoformat(),
            'active_sessions': len(self.active_sessions),
            'recent_events_24h': len(recent_events),
            'event_counts': event_counts,
            'security_policy': {
                'encryption_enabled': self.policy.encryption_enabled,
                'anonymization_enabled': self.policy.anonymization_enabled,
                'access_control_enabled': self.policy.access_control_enabled,
                'audit_logging_enabled': self.policy.audit_logging_enabled,
                'rate_limit_per_minute': self.policy.rate_limit_per_minute
            },
            'rate_limit_status': len(self.rate_limits),
            'data_retention_days': self.policy.data_retention_days
        }
    
    def sanitize_log_data(self, log_data: Dict) -> Dict:
        """清理日志数据"""
        sanitized = log_data.copy()
        
        # 匿名化IP地址
        if 'ip' in sanitized:
            sanitized['ip'] = self.anonymize_ip(sanitized['ip'])
        
        # 匿名化用户代理
        if 'user_agent' in sanitized:
            sanitized['user_agent'] = self.anonymize_user_agent(sanitized['user_agent'])
        
        # 移除敏感字段
        sensitive_fields = ['password', 'token', 'session_id', 'api_key']
        for field in sensitive_fields:
            if field in sanitized:
                sanitized[field] = '[REDACTED]'
        
        # 加密敏感数据
        if 'email' in sanitized:
            sanitized['email'] = self.encrypt_data(sanitized['email'])
        
        return sanitized

# 使用示例
if __name__ == '__main__':
    # 创建安全管理器
    security_manager = SecurityManager('security_config.json')
    
    # 测试IP访问控制
    test_ip = '192.168.1.100'
    if security_manager.check_ip_access(test_ip):
        print(f"IP {test_ip} 访问允许")
    else:
        print(f"IP {test_ip} 访问被拒绝")
    
    # 测试速率限制
    for i in range(5):
        if security_manager.check_rate_limit(test_ip):
            print(f"请求 {i+1} 通过速率限制")
        else:
            print(f"请求 {i+1} 被速率限制")
    
    # 测试数据加密
    sensitive_data = "用户敏感信息"
    encrypted = security_manager.encrypt_data(sensitive_data)
    decrypted = security_manager.decrypt_data(encrypted)
    print(f"原始数据: {sensitive_data}")
    print(f"加密数据: {encrypted}")
    print(f"解密数据: {decrypted}")
    
    # 测试IP匿名化
    original_ip = "192.168.1.100"
    anonymized_ip = security_manager.anonymize_ip(original_ip)
    print(f"原始IP: {original_ip}")
    print(f"匿名化IP: {anonymized_ip}")
    
    # 生成安全报告
    report = security_manager.get_security_report()
    print("\n安全报告:")
    print(json.dumps(report, indent=2, ensure_ascii=False))

5.3.2 合规性管理

#!/usr/bin/env python3
# compliance_manager.py - 合规性管理器

import json
import logging
import sqlite3
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
import hashlib
import uuid
import os
from pathlib import Path

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ComplianceStandard(Enum):
    """合规标准"""
    GDPR = "gdpr"  # 欧盟通用数据保护条例
    CCPA = "ccpa"  # 加州消费者隐私法
    HIPAA = "hipaa"  # 健康保险便携性和责任法案
    SOX = "sox"  # 萨班斯-奥克斯利法案
    PCI_DSS = "pci_dss"  # 支付卡行业数据安全标准
    ISO27001 = "iso27001"  # ISO 27001信息安全管理

@dataclass
class DataProcessingRecord:
    """数据处理记录"""
    id: str
    timestamp: datetime
    data_type: str
    processing_purpose: str
    legal_basis: str
    data_subject: str
    retention_period: int  # 天数
    anonymized: bool
    encrypted: bool
    shared_with: List[str]
    location: str
    compliance_standards: List[ComplianceStandard]

@dataclass
class ConsentRecord:
    """同意记录"""
    id: str
    data_subject: str
    consent_type: str
    granted_at: datetime
    expires_at: Optional[datetime]
    withdrawn_at: Optional[datetime]
    purpose: str
    legal_basis: str
    is_active: bool

@dataclass
class DataRetentionPolicy:
    """数据保留策略"""
    data_type: str
    retention_days: int
    auto_delete: bool
    archive_before_delete: bool
    compliance_requirements: List[ComplianceStandard]
    deletion_method: str  # 'secure_delete', 'anonymize', 'archive'

class ComplianceManager:
    """合规性管理器"""
    
    def __init__(self, config_file: str, db_path: str = 'compliance.db'):
        self.config = self.load_config(config_file)
        self.db_path = db_path
        self.init_database()
        self.retention_policies = self.load_retention_policies()
        self.compliance_standards = self.config.get('compliance_standards', [])
    
    def load_config(self, config_file: str) -> Dict:
        """加载配置文件"""
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        except Exception as e:
            logger.error(f"加载合规配置失败: {e}")
            return {}
    
    def init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 数据处理记录表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS data_processing_records (
                id TEXT PRIMARY KEY,
                timestamp TEXT NOT NULL,
                data_type TEXT NOT NULL,
                processing_purpose TEXT NOT NULL,
                legal_basis TEXT NOT NULL,
                data_subject TEXT NOT NULL,
                retention_period INTEGER NOT NULL,
                anonymized BOOLEAN NOT NULL,
                encrypted BOOLEAN NOT NULL,
                shared_with TEXT,
                location TEXT NOT NULL,
                compliance_standards TEXT NOT NULL
            )
        ''')
        
        # 同意记录表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS consent_records (
                id TEXT PRIMARY KEY,
                data_subject TEXT NOT NULL,
                consent_type TEXT NOT NULL,
                granted_at TEXT NOT NULL,
                expires_at TEXT,
                withdrawn_at TEXT,
                purpose TEXT NOT NULL,
                legal_basis TEXT NOT NULL,
                is_active BOOLEAN NOT NULL
            )
        ''')
        
        # 数据删除记录表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS data_deletion_records (
                id TEXT PRIMARY KEY,
                data_subject TEXT NOT NULL,
                data_type TEXT NOT NULL,
                deletion_reason TEXT NOT NULL,
                deleted_at TEXT NOT NULL,
                deletion_method TEXT NOT NULL,
                verified_by TEXT NOT NULL
            )
        ''')
        
        # 合规审计日志表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS compliance_audit_log (
                id TEXT PRIMARY KEY,
                timestamp TEXT NOT NULL,
                event_type TEXT NOT NULL,
                compliance_standard TEXT NOT NULL,
                details TEXT NOT NULL,
                risk_level TEXT NOT NULL,
                resolved BOOLEAN NOT NULL
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def load_retention_policies(self) -> List[DataRetentionPolicy]:
        """加载数据保留策略"""
        policies_config = self.config.get('retention_policies', [])
        policies = []
        
        for policy_config in policies_config:
            policy = DataRetentionPolicy(
                data_type=policy_config['data_type'],
                retention_days=policy_config['retention_days'],
                auto_delete=policy_config.get('auto_delete', False),
                archive_before_delete=policy_config.get('archive_before_delete', True),
                compliance_requirements=[ComplianceStandard(std) for std in policy_config.get('compliance_requirements', [])],
                deletion_method=policy_config.get('deletion_method', 'secure_delete')
            )
            policies.append(policy)
        
        return policies
    
    def record_data_processing(self, record: DataProcessingRecord) -> bool:
        """记录数据处理活动"""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            cursor.execute('''
                INSERT INTO data_processing_records (
                    id, timestamp, data_type, processing_purpose, legal_basis,
                    data_subject, retention_period, anonymized, encrypted,
                    shared_with, location, compliance_standards
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                record.id,
                record.timestamp.isoformat(),
                record.data_type,
                record.processing_purpose,
                record.legal_basis,
                record.data_subject,
                record.retention_period,
                record.anonymized,
                record.encrypted,
                json.dumps(record.shared_with),
                record.location,
                json.dumps([std.value for std in record.compliance_standards])
            ))
            
            conn.commit()
            conn.close()
            
            logger.info(f"记录数据处理活动: {record.id}")
            return True
            
        except Exception as e:
            logger.error(f"记录数据处理活动失败: {e}")
            return False
    
    def record_consent(self, consent: ConsentRecord) -> bool:
        """记录用户同意"""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            cursor.execute('''
                INSERT INTO consent_records (
                    id, data_subject, consent_type, granted_at, expires_at,
                    withdrawn_at, purpose, legal_basis, is_active
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                consent.id,
                consent.data_subject,
                consent.consent_type,
                consent.granted_at.isoformat(),
                consent.expires_at.isoformat() if consent.expires_at else None,
                consent.withdrawn_at.isoformat() if consent.withdrawn_at else None,
                consent.purpose,
                consent.legal_basis,
                consent.is_active
            ))
            
            conn.commit()
            conn.close()
            
            logger.info(f"记录用户同意: {consent.id}")
            return True
            
        except Exception as e:
            logger.error(f"记录用户同意失败: {e}")
            return False
    
    def withdraw_consent(self, consent_id: str, withdrawal_reason: str) -> bool:
        """撤回用户同意"""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            cursor.execute('''
                UPDATE consent_records 
                SET withdrawn_at = ?, is_active = ?, withdrawal_reason = ?
                WHERE id = ?
            ''', (
                datetime.now().isoformat(),
                False,
                withdrawal_reason,
                consent_id
            ))
            
            conn.commit()
            conn.close()
            
            # 记录审计日志
            self.log_compliance_event(
                'consent_withdrawn',
                ComplianceStandard.GDPR,
                {'consent_id': consent_id, 'reason': withdrawal_reason},
                'medium'
            )
            
            logger.info(f"撤回用户同意: {consent_id}")
            return True
            
        except Exception as e:
            logger.error(f"撤回用户同意失败: {e}")
            return False
    
    def check_data_retention(self) -> List[Dict]:
        """检查数据保留期限"""
        expired_data = []
        
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            cursor.execute('''
                SELECT * FROM data_processing_records
                WHERE datetime(timestamp, '+' || retention_period || ' days') < datetime('now')
            ''')
            
            rows = cursor.fetchall()
            columns = [description[0] for description in cursor.description]
            
            for row in rows:
                record_dict = dict(zip(columns, row))
                expired_data.append(record_dict)
            
            conn.close()
            
            if expired_data:
                self.log_compliance_event(
                    'data_retention_violation',
                    ComplianceStandard.GDPR,
                    {'expired_records_count': len(expired_data)},
                    'high'
                )
            
            return expired_data
            
        except Exception as e:
            logger.error(f"检查数据保留期限失败: {e}")
            return []
    
    def auto_delete_expired_data(self) -> int:
        """自动删除过期数据"""
        expired_data = self.check_data_retention()
        deleted_count = 0
        
        for record in expired_data:
            data_type = record['data_type']
            policy = self.get_retention_policy(data_type)
            
            if policy and policy.auto_delete:
                if self.delete_data_record(record['id'], 'automatic_retention_policy'):
                    deleted_count += 1
        
        if deleted_count > 0:
            logger.info(f"自动删除过期数据: {deleted_count} 条记录")
        
        return deleted_count
    
    def get_retention_policy(self, data_type: str) -> Optional[DataRetentionPolicy]:
        """获取数据保留策略"""
        for policy in self.retention_policies:
            if policy.data_type == data_type:
                return policy
        return None
    
    def delete_data_record(self, record_id: str, deletion_reason: str) -> bool:
        """删除数据记录"""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            # 获取记录详情
            cursor.execute('SELECT * FROM data_processing_records WHERE id = ?', (record_id,))
            record = cursor.fetchone()
            
            if not record:
                logger.warning(f"未找到数据记录: {record_id}")
                return False
            
            # 记录删除操作
            deletion_id = str(uuid.uuid4())
            cursor.execute('''
                INSERT INTO data_deletion_records (
                    id, data_subject, data_type, deletion_reason,
                    deleted_at, deletion_method, verified_by
                ) VALUES (?, ?, ?, ?, ?, ?, ?)
            ''', (
                deletion_id,
                record[5],  # data_subject
                record[2],  # data_type
                deletion_reason,
                datetime.now().isoformat(),
                'secure_delete',
                'system'
            ))
            
            # 删除原记录
            cursor.execute('DELETE FROM data_processing_records WHERE id = ?', (record_id,))
            
            conn.commit()
            conn.close()
            
            logger.info(f"删除数据记录: {record_id}")
            return True
            
        except Exception as e:
            logger.error(f"删除数据记录失败: {e}")
            return False
    
    def generate_data_subject_report(self, data_subject: str) -> Dict:
        """生成数据主体报告(GDPR第15条)"""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            # 获取数据处理记录
            cursor.execute('''
                SELECT * FROM data_processing_records WHERE data_subject = ?
            ''', (data_subject,))
            processing_records = cursor.fetchall()
            
            # 获取同意记录
            cursor.execute('''
                SELECT * FROM consent_records WHERE data_subject = ?
            ''', (data_subject,))
            consent_records = cursor.fetchall()
            
            # 获取删除记录
            cursor.execute('''
                SELECT * FROM data_deletion_records WHERE data_subject = ?
            ''', (data_subject,))
            deletion_records = cursor.fetchall()
            
            conn.close()
            
            report = {
                'data_subject': data_subject,
                'report_generated_at': datetime.now().isoformat(),
                'processing_records': len(processing_records),
                'consent_records': len(consent_records),
                'deletion_records': len(deletion_records),
                'data_categories': list(set([record[2] for record in processing_records])),
                'processing_purposes': list(set([record[3] for record in processing_records])),
                'legal_bases': list(set([record[4] for record in processing_records])),
                'data_shared_with': [],
                'retention_periods': {}
            }
            
            # 汇总共享信息
            for record in processing_records:
                shared_with = json.loads(record[9]) if record[9] else []
                report['data_shared_with'].extend(shared_with)
            
            report['data_shared_with'] = list(set(report['data_shared_with']))
            
            # 汇总保留期限
            for record in processing_records:
                data_type = record[2]
                retention_period = record[6]
                if data_type not in report['retention_periods']:
                    report['retention_periods'][data_type] = retention_period
            
            return report
            
        except Exception as e:
            logger.error(f"生成数据主体报告失败: {e}")
            return {}
    
    def log_compliance_event(self, event_type: str, standard: ComplianceStandard, 
                           details: Dict, risk_level: str):
        """记录合规事件"""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            event_id = str(uuid.uuid4())
            cursor.execute('''
                INSERT INTO compliance_audit_log (
                    id, timestamp, event_type, compliance_standard,
                    details, risk_level, resolved
                ) VALUES (?, ?, ?, ?, ?, ?, ?)
            ''', (
                event_id,
                datetime.now().isoformat(),
                event_type,
                standard.value,
                json.dumps(details),
                risk_level,
                False
            ))
            
            conn.commit()
            conn.close()
            
            logger.info(f"记录合规事件: {event_type} - {standard.value}")
            
        except Exception as e:
            logger.error(f"记录合规事件失败: {e}")
    
    def generate_compliance_report(self, standard: ComplianceStandard, 
                                 start_date: datetime, end_date: datetime) -> Dict:
        """生成合规报告"""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            # 获取指定时间范围内的合规事件
            cursor.execute('''
                SELECT * FROM compliance_audit_log 
                WHERE compliance_standard = ? 
                AND datetime(timestamp) BETWEEN datetime(?) AND datetime(?)
            ''', (standard.value, start_date.isoformat(), end_date.isoformat()))
            
            events = cursor.fetchall()
            
            # 统计事件类型
            event_counts = {}
            risk_levels = {'low': 0, 'medium': 0, 'high': 0, 'critical': 0}
            resolved_count = 0
            
            for event in events:
                event_type = event[2]
                risk_level = event[5]
                resolved = event[6]
                
                event_counts[event_type] = event_counts.get(event_type, 0) + 1
                risk_levels[risk_level] = risk_levels.get(risk_level, 0) + 1
                
                if resolved:
                    resolved_count += 1
            
            # 获取数据处理统计
            cursor.execute('''
                SELECT COUNT(*) FROM data_processing_records 
                WHERE datetime(timestamp) BETWEEN datetime(?) AND datetime(?)
            ''', (start_date.isoformat(), end_date.isoformat()))
            
            processing_count = cursor.fetchone()[0]
            
            # 获取同意统计
            cursor.execute('''
                SELECT COUNT(*) FROM consent_records 
                WHERE datetime(granted_at) BETWEEN datetime(?) AND datetime(?)
            ''', (start_date.isoformat(), end_date.isoformat()))
            
            consent_count = cursor.fetchone()[0]
            
            conn.close()
            
            report = {
                'compliance_standard': standard.value,
                'report_period': {
                    'start_date': start_date.isoformat(),
                    'end_date': end_date.isoformat()
                },
                'summary': {
                    'total_events': len(events),
                    'resolved_events': resolved_count,
                    'unresolved_events': len(events) - resolved_count,
                    'data_processing_activities': processing_count,
                    'consent_records': consent_count
                },
                'event_breakdown': event_counts,
                'risk_distribution': risk_levels,
                'compliance_score': self.calculate_compliance_score(events),
                'recommendations': self.generate_compliance_recommendations(events)
            }
            
            return report
            
        except Exception as e:
            logger.error(f"生成合规报告失败: {e}")
            return {}
    
    def calculate_compliance_score(self, events: List) -> float:
        """计算合规分数"""
        if not events:
            return 100.0
        
        total_score = 100.0
        
        for event in events:
            risk_level = event[5]
            resolved = event[6]
            
            # 根据风险级别扣分
            if risk_level == 'low':
                penalty = 1.0
            elif risk_level == 'medium':
                penalty = 3.0
            elif risk_level == 'high':
                penalty = 7.0
            else:  # critical
                penalty = 15.0
            
            # 如果已解决,减少扣分
            if resolved:
                penalty *= 0.3
            
            total_score -= penalty
        
        return max(0.0, total_score)
    
    def generate_compliance_recommendations(self, events: List) -> List[str]:
        """生成合规建议"""
        recommendations = []
        
        # 统计未解决的高风险事件
        high_risk_unresolved = sum(1 for event in events 
                                 if event[5] in ['high', 'critical'] and not event[6])
        
        if high_risk_unresolved > 0:
            recommendations.append(f"立即处理 {high_risk_unresolved} 个高风险未解决事件")
        
        # 检查数据保留违规
        retention_violations = sum(1 for event in events 
                                 if event[2] == 'data_retention_violation')
        
        if retention_violations > 0:
            recommendations.append("建立自动化数据保留管理流程")
        
        # 检查同意管理
        consent_issues = sum(1 for event in events 
                           if 'consent' in event[2])
        
        if consent_issues > 0:
            recommendations.append("加强用户同意管理和记录")
        
        if not recommendations:
            recommendations.append("当前合规状态良好,继续保持")
        
        return recommendations
    
    def export_compliance_data(self, output_dir: str, standard: ComplianceStandard) -> bool:
        """导出合规数据"""
        try:
            output_path = Path(output_dir)
            output_path.mkdir(parents=True, exist_ok=True)
            
            conn = sqlite3.connect(self.db_path)
            
            # 导出数据处理记录
            cursor = conn.cursor()
            cursor.execute('SELECT * FROM data_processing_records')
            processing_data = cursor.fetchall()
            
            with open(output_path / 'data_processing_records.json', 'w', encoding='utf-8') as f:
                json.dump(processing_data, f, indent=2, ensure_ascii=False, default=str)
            
            # 导出同意记录
            cursor.execute('SELECT * FROM consent_records')
            consent_data = cursor.fetchall()
            
            with open(output_path / 'consent_records.json', 'w', encoding='utf-8') as f:
                json.dump(consent_data, f, indent=2, ensure_ascii=False, default=str)
            
            # 导出审计日志
            cursor.execute('SELECT * FROM compliance_audit_log WHERE compliance_standard = ?', 
                         (standard.value,))
            audit_data = cursor.fetchall()
            
            with open(output_path / f'audit_log_{standard.value}.json', 'w', encoding='utf-8') as f:
                json.dump(audit_data, f, indent=2, ensure_ascii=False, default=str)
            
            conn.close()
            
            logger.info(f"合规数据导出完成: {output_dir}")
            return True
            
        except Exception as e:
            logger.error(f"导出合规数据失败: {e}")
            return False

# 使用示例
if __name__ == '__main__':
    # 创建合规管理器
    compliance_manager = ComplianceManager('compliance_config.json')
    
    # 记录数据处理活动
    processing_record = DataProcessingRecord(
        id=str(uuid.uuid4()),
        timestamp=datetime.now(),
        data_type='access_logs',
        processing_purpose='website_analytics',
        legal_basis='legitimate_interest',
        data_subject='user123',
        retention_period=90,
        anonymized=True,
        encrypted=True,
        shared_with=['analytics_team'],
        location='eu_west',
        compliance_standards=[ComplianceStandard.GDPR]
    )
    
    compliance_manager.record_data_processing(processing_record)
    
    # 记录用户同意
    consent_record = ConsentRecord(
        id=str(uuid.uuid4()),
        data_subject='user123',
        consent_type='analytics_cookies',
        granted_at=datetime.now(),
        expires_at=datetime.now() + timedelta(days=365),
        withdrawn_at=None,
        purpose='website_analytics',
        legal_basis='consent',
        is_active=True
    )
    
    compliance_manager.record_consent(consent_record)
    
    # 检查数据保留
    expired_data = compliance_manager.check_data_retention()
    print(f"过期数据记录: {len(expired_data)}")
    
    # 生成数据主体报告
    subject_report = compliance_manager.generate_data_subject_report('user123')
    print("数据主体报告:")
    print(json.dumps(subject_report, indent=2, ensure_ascii=False))
    
    # 生成合规报告
    start_date = datetime.now() - timedelta(days=30)
    end_date = datetime.now()
    compliance_report = compliance_manager.generate_compliance_report(
        ComplianceStandard.GDPR, start_date, end_date
    )
    print("\n合规报告:")
    print(json.dumps(compliance_report, indent=2, ensure_ascii=False))

5.4 企业级部署配置

5.4.1 Kubernetes部署配置

# k8s-deployment.yaml - Kubernetes部署配置
apiVersion: v1
kind: Namespace
metadata:
  name: goaccess-enterprise
  labels:
    name: goaccess-enterprise
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: goaccess-config
  namespace: goaccess-enterprise
data:
  goaccess.conf: |
    time-format %H:%M:%S
    date-format %d/%b/%Y
    log-format COMBINED
    
    real-time-html true
    ws-url wss://goaccess.example.com:7890
    port 7890
    
    persist true
    restore true
    db-path /data/goaccess.db
    
    geoip-database /usr/share/GeoIP/GeoLite2-City.mmdb
    
    exclude-ip 127.0.0.1
    exclude-ip 10.0.0.0/8
    exclude-ip 172.16.0.0/12
    exclude-ip 192.168.0.0/16
    
    ignore-crawlers true
    crawlers-only false
    
    enable-panel VISITORS
    enable-panel REQUESTS
    enable-panel REQUESTS_STATIC
    enable-panel NOT_FOUND
    enable-panel HOSTS
    enable-panel OS
    enable-panel BROWSERS
    enable-panel VISIT_TIMES
    enable-panel VIRTUAL_HOSTS
    enable-panel REFERRERS
    enable-panel REFERRING_SITES
    enable-panel KEYPHRASES
    enable-panel STATUS_CODES
    enable-panel REMOTE_USER
    enable-panel CACHE_STATUS
    enable-panel GEO_LOCATION
    
    html-custom-css /config/custom.css
    html-custom-js /config/custom.js
    
  custom.css: |
    /* 企业级自定义样式 */
    .navbar-brand {
        background: url('/assets/logo.png') no-repeat;
        background-size: contain;
        width: 200px;
        height: 40px;
        text-indent: -9999px;
    }
    
    .panel-heading {
        background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
        color: white;
    }
    
    .table-dark {
        background-color: #2c3e50;
    }
    
    .progress-bar {
        background: linear-gradient(90deg, #4CAF50, #45a049);
    }
    
  custom.js: |
    // 企业级自定义JavaScript
    document.addEventListener('DOMContentLoaded', function() {
        // 添加企业标识
        const navbar = document.querySelector('.navbar-brand');
        if (navbar) {
            navbar.setAttribute('title', 'Enterprise Analytics Dashboard');
        }
        
        // 添加实时更新指示器
        const indicator = document.createElement('div');
        indicator.id = 'live-indicator';
        indicator.innerHTML = '<span class="badge badge-success">LIVE</span>';
        indicator.style.position = 'fixed';
        indicator.style.top = '10px';
        indicator.style.right = '10px';
        indicator.style.zIndex = '9999';
        document.body.appendChild(indicator);
        
        // 添加数据导出功能
        const exportBtn = document.createElement('button');
        exportBtn.innerHTML = 'Export Data';
        exportBtn.className = 'btn btn-primary';
        exportBtn.style.position = 'fixed';
        exportBtn.style.bottom = '20px';
        exportBtn.style.right = '20px';
        exportBtn.onclick = function() {
            window.open('/api/export', '_blank');
        };
        document.body.appendChild(exportBtn);
    });
---
apiVersion: v1
kind: Secret
metadata:
  name: goaccess-secrets
  namespace: goaccess-enterprise
type: Opaque
data:
  # Base64编码的密钥
  jwt-secret: <base64-encoded-jwt-secret>
  encryption-key: <base64-encoded-encryption-key>
  database-password: <base64-encoded-db-password>
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: goaccess-data
  namespace: goaccess-enterprise
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 100Gi
  storageClassName: fast-ssd
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: goaccess-logs
  namespace: goaccess-enterprise
spec:
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 500Gi
  storageClassName: shared-storage
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: goaccess-server
  namespace: goaccess-enterprise
  labels:
    app: goaccess-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: goaccess-server
  template:
    metadata:
      labels:
        app: goaccess-server
    spec:
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        fsGroup: 1000
      containers:
      - name: goaccess
        image: goaccess/goaccess:1.7.1
        imagePullPolicy: Always
        ports:
        - containerPort: 7890
          name: websocket
        - containerPort: 8080
          name: http
        env:
        - name: GOACCESS_CONFIG_FILE
          value: "/config/goaccess.conf"
        - name: JWT_SECRET
          valueFrom:
            secretKeyRef:
              name: goaccess-secrets
              key: jwt-secret
        - name: ENCRYPTION_KEY
          valueFrom:
            secretKeyRef:
              name: goaccess-secrets
              key: encryption-key
        volumeMounts:
        - name: config-volume
          mountPath: /config
        - name: data-volume
          mountPath: /data
        - name: logs-volume
          mountPath: /logs
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "2000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5
        securityContext:
          allowPrivilegeEscalation: false
          readOnlyRootFilesystem: true
          capabilities:
            drop:
            - ALL
      - name: log-collector
        image: fluent/fluent-bit:2.0
        env:
        - name: FLUENT_CONF
          value: "fluent-bit.conf"
        volumeMounts:
        - name: fluent-bit-config
          mountPath: /fluent-bit/etc
        - name: logs-volume
          mountPath: /logs
        resources:
          requests:
            memory: "128Mi"
            cpu: "100m"
          limits:
            memory: "256Mi"
            cpu: "200m"
      volumes:
      - name: config-volume
        configMap:
          name: goaccess-config
      - name: data-volume
        persistentVolumeClaim:
          claimName: goaccess-data
      - name: logs-volume
        persistentVolumeClaim:
          claimName: goaccess-logs
      - name: fluent-bit-config
        configMap:
          name: fluent-bit-config
---
apiVersion: v1
kind: Service
metadata:
  name: goaccess-service
  namespace: goaccess-enterprise
  labels:
    app: goaccess-server
spec:
  selector:
    app: goaccess-server
  ports:
  - name: websocket
    port: 7890
    targetPort: 7890
    protocol: TCP
  - name: http
    port: 8080
    targetPort: 8080
    protocol: TCP
  type: ClusterIP
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: goaccess-ingress
  namespace: goaccess-enterprise
  annotations:
    kubernetes.io/ingress.class: "nginx"
    nginx.ingress.kubernetes.io/ssl-redirect: "true"
    nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
    nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
    nginx.ingress.kubernetes.io/websocket-services: "goaccess-service"
    cert-manager.io/cluster-issuer: "letsencrypt-prod"
    nginx.ingress.kubernetes.io/auth-type: basic
    nginx.ingress.kubernetes.io/auth-secret: goaccess-auth
    nginx.ingress.kubernetes.io/auth-realm: 'Authentication Required - GoAccess Enterprise'
spec:
  tls:
  - hosts:
    - goaccess.example.com
    secretName: goaccess-tls
  rules:
  - host: goaccess.example.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: goaccess-service
            port:
              number: 8080
      - path: /ws
        pathType: Prefix
        backend:
          service:
            name: goaccess-service
            port:
              number: 7890
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: fluent-bit-config
  namespace: goaccess-enterprise
data:
  fluent-bit.conf: |
    [SERVICE]
        Flush         1
        Log_Level     info
        Daemon        off
        Parsers_File  parsers.conf
        HTTP_Server   On
        HTTP_Listen   0.0.0.0
        HTTP_Port     2020
    
    [INPUT]
        Name              tail
        Path              /logs/*.log
        Parser            nginx
        Tag               nginx.access
        Refresh_Interval  5
        Mem_Buf_Limit     50MB
    
    [FILTER]
        Name                kubernetes
        Match               nginx.*
        Kube_URL            https://kubernetes.default.svc:443
        Kube_CA_File        /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
        Kube_Token_File     /var/run/secrets/kubernetes.io/serviceaccount/token
        Kube_Tag_Prefix     kube.var.log.containers.
        Merge_Log           On
        Keep_Log            Off
    
    [OUTPUT]
        Name  forward
        Match *
        Host  goaccess-service
        Port  24224
        
  parsers.conf: |
    [PARSER]
        Name        nginx
        Format      regex
        Regex       ^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")
        Time_Key    time
        Time_Format %d/%b/%Y:%H:%M:%S %z
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: goaccess-hpa
  namespace: goaccess-enterprise
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: goaccess-server
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: goaccess-pdb
  namespace: goaccess-enterprise
spec:
  minAvailable: 2
  selector:
    matchLabels:
      app: goaccess-server
---
apiVersion: v1
kind: Secret
metadata:
  name: goaccess-auth
  namespace: goaccess-enterprise
type: Opaque
data:
  # htpasswd格式的用户认证信息
  auth: YWRtaW46JGFwcjEkSDY1dnVhJE5oVjNIUzFOOXFSbXNyVGNOWjhOZjE=

5.4.2 Docker Swarm部署配置

# docker-swarm-stack.yml - Docker Swarm部署配置
version: '3.8'

services:
  goaccess:
    image: goaccess/goaccess:1.7.1
    deploy:
      replicas: 3
      placement:
        constraints:
          - node.role == worker
        preferences:
          - spread: node.labels.zone
      resources:
        limits:
          cpus: '2.0'
          memory: 2G
        reservations:
          cpus: '0.5'
          memory: 512M
      restart_policy:
        condition: on-failure
        delay: 5s
        max_attempts: 3
        window: 120s
      update_config:
        parallelism: 1
        delay: 10s
        failure_action: rollback
        monitor: 60s
        max_failure_ratio: 0.3
      rollback_config:
        parallelism: 1
        delay: 5s
        failure_action: pause
        monitor: 60s
        max_failure_ratio: 0.3
    environment:
      - GOACCESS_CONFIG_FILE=/config/goaccess.conf
      - JWT_SECRET_FILE=/run/secrets/jwt_secret
      - ENCRYPTION_KEY_FILE=/run/secrets/encryption_key
    configs:
      - source: goaccess_config
        target: /config/goaccess.conf
      - source: custom_css
        target: /config/custom.css
      - source: custom_js
        target: /config/custom.js
    secrets:
      - jwt_secret
      - encryption_key
      - db_password
    volumes:
      - goaccess_data:/data
      - goaccess_logs:/logs:ro
    ports:
      - "7890:7890"
      - "8080:8080"
    networks:
      - goaccess_network
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 40s
    logging:
      driver: "json-file"
      options:
        max-size: "100m"
        max-file: "3"
        labels: "service=goaccess"

  nginx:
    image: nginx:alpine
    deploy:
      replicas: 2
      placement:
        constraints:
          - node.role == worker
      resources:
        limits:
          cpus: '1.0'
          memory: 512M
        reservations:
          cpus: '0.25'
          memory: 128M
      restart_policy:
        condition: on-failure
        delay: 5s
        max_attempts: 3
    configs:
      - source: nginx_config
        target: /etc/nginx/nginx.conf
      - source: nginx_ssl_config
        target: /etc/nginx/conf.d/ssl.conf
    secrets:
      - source: ssl_cert
        target: /etc/ssl/certs/goaccess.crt
      - source: ssl_key
        target: /etc/ssl/private/goaccess.key
    ports:
      - "80:80"
      - "443:443"
    networks:
      - goaccess_network
    depends_on:
      - goaccess
    healthcheck:
      test: ["CMD", "wget", "--quiet", "--tries=1", "--spider", "http://localhost/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  redis:
    image: redis:7-alpine
    deploy:
      replicas: 1
      placement:
        constraints:
          - node.role == manager
      resources:
        limits:
          cpus: '1.0'
          memory: 1G
        reservations:
          cpus: '0.25'
          memory: 256M
      restart_policy:
        condition: on-failure
    command: redis-server --requirepass ${REDIS_PASSWORD} --maxmemory 512mb --maxmemory-policy allkeys-lru
    volumes:
      - redis_data:/data
    networks:
      - goaccess_network
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 30s
      timeout: 10s
      retries: 3

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.8.0
    deploy:
      replicas: 3
      placement:
        constraints:
          - node.role == worker
        preferences:
          - spread: node.labels.zone
      resources:
        limits:
          cpus: '2.0'
          memory: 4G
        reservations:
          cpus: '1.0'
          memory: 2G
      restart_policy:
        condition: on-failure
    environment:
      - cluster.name=goaccess-cluster
      - node.name={{.Node.Hostname}}
      - discovery.seed_hosts=elasticsearch
      - cluster.initial_master_nodes=elasticsearch
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms2g -Xmx2g"
      - xpack.security.enabled=true
      - xpack.security.http.ssl.enabled=false
      - xpack.security.transport.ssl.enabled=false
    volumes:
      - elasticsearch_data:/usr/share/elasticsearch/data
    networks:
      - goaccess_network
    healthcheck:
      test: ["CMD-SHELL", "curl -f http://localhost:9200/_cluster/health || exit 1"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 60s

  logstash:
    image: docker.elastic.co/logstash/logstash:8.8.0
    deploy:
      replicas: 2
      placement:
        constraints:
          - node.role == worker
      resources:
        limits:
          cpus: '1.5'
          memory: 2G
        reservations:
          cpus: '0.5'
          memory: 1G
      restart_policy:
        condition: on-failure
    configs:
      - source: logstash_config
        target: /usr/share/logstash/pipeline/logstash.conf
      - source: logstash_yml
        target: /usr/share/logstash/config/logstash.yml
    environment:
      - "LS_JAVA_OPTS=-Xmx1g -Xms1g"
    volumes:
      - goaccess_logs:/logs:ro
    networks:
      - goaccess_network
    depends_on:
      - elasticsearch
    healthcheck:
      test: ["CMD-SHELL", "curl -f http://localhost:9600 || exit 1"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 60s

  prometheus:
    image: prom/prometheus:latest
    deploy:
      replicas: 1
      placement:
        constraints:
          - node.role == manager
      resources:
        limits:
          cpus: '1.0'
          memory: 1G
        reservations:
          cpus: '0.25'
          memory: 256M
    configs:
      - source: prometheus_config
        target: /etc/prometheus/prometheus.yml
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--web.console.libraries=/etc/prometheus/console_libraries'
      - '--web.console.templates=/etc/prometheus/consoles'
      - '--storage.tsdb.retention.time=30d'
      - '--web.enable-lifecycle'
    volumes:
      - prometheus_data:/prometheus
    networks:
      - goaccess_network
    ports:
      - "9090:9090"

  grafana:
    image: grafana/grafana:latest
    deploy:
      replicas: 1
      placement:
        constraints:
          - node.role == manager
      resources:
        limits:
          cpus: '1.0'
          memory: 1G
        reservations:
          cpus: '0.25'
          memory: 256M
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_ADMIN_PASSWORD}
      - GF_USERS_ALLOW_SIGN_UP=false
      - GF_INSTALL_PLUGINS=grafana-piechart-panel,grafana-worldmap-panel
    volumes:
      - grafana_data:/var/lib/grafana
    networks:
      - goaccess_network
    ports:
      - "3000:3000"
    depends_on:
      - prometheus

networks:
  goaccess_network:
    driver: overlay
    attachable: true
    ipam:
      config:
        - subnet: 10.0.9.0/24

volumes:
  goaccess_data:
    driver: local
    driver_opts:
      type: nfs
      o: addr=nfs-server.example.com,rw
      device: ":/data/goaccess"
  goaccess_logs:
    driver: local
    driver_opts:
      type: nfs
      o: addr=nfs-server.example.com,ro
      device: ":/logs"
  redis_data:
    driver: local
  elasticsearch_data:
    driver: local
  prometheus_data:
    driver: local
  grafana_data:
    driver: local

configs:
  goaccess_config:
    external: true
  custom_css:
    external: true
  custom_js:
    external: true
  nginx_config:
    external: true
  nginx_ssl_config:
    external: true
  logstash_config:
    external: true
  logstash_yml:
    external: true
  prometheus_config:
    external: true

secrets:
  jwt_secret:
    external: true
  encryption_key:
    external: true
  db_password:
    external: true
  ssl_cert:
    external: true
  ssl_key:
    external: true

6. 总结

本章详细介绍了GoAccess的高级应用场景和企业级部署方案,涵盖了以下关键内容:

6.1 企业级架构设计

  • 分布式部署架构:多节点部署、Docker Swarm和Kubernetes部署方案
  • 高可用性配置:故障转移管理、负载均衡和数据备份
  • 大规模日志处理:分布式日志收集、实时流处理和数据管道

6.2 容器化与云服务集成

  • Docker容器化部署:完整的容器化解决方案
  • Kubernetes编排:生产级别的K8s部署配置
  • 云服务集成:AWS、GCP、Azure等主流云平台集成

6.3 实时数据处理

  • Kafka流处理:大规模实时日志流处理
  • 实时数据管道:可扩展的数据处理架构
  • 性能监控:Prometheus指标集成和监控

6.4 安全与合规

  • 数据安全管理:加密、匿名化、访问控制
  • 合规性管理:GDPR、CCPA等法规遵循
  • 审计日志:完整的操作审计和追踪

6.5 企业级部署配置

  • Kubernetes部署:完整的生产级K8s配置
  • Docker Swarm部署:高可用的Swarm集群配置
  • 监控和告警:集成Prometheus、Grafana监控栈

通过本章的学习,您已经掌握了GoAccess在企业环境中的高级应用和部署技能。这些配置和代码示例为您在生产环境中部署和管理GoAccess提供了完整的解决方案。

下一章我们将学习GoAccess的最佳实践和案例分析,帮助您更好地应用这些高级特性。


#### 5.3.2 合规性管理

```python
#!/usr/bin/env python3
# compliance_manager.py - 合规性管理器

import json
import logging
import sqlite3
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
import hashlib
import uuid
import os
from pathlib import Path

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ComplianceStandard(Enum):
    """合规标准"""
    GDPR = "gdpr"  # 欧盟通用数据保护条例
    CCPA = "ccpa"  # 加州消费者隐私法
    HIPAA = "hipaa"  # 健康保险便携性和责任法案
    SOX = "sox"  # 萨班斯-奥克斯利法案
    PCI_DSS = "pci_dss"  # 支付卡行业数据安全标准
    ISO27001 = "iso27001"  # ISO 27001信息安全管理

class DataCategory(Enum):
    """数据类别"""
    PERSONAL = "personal"  # 个人数据
    SENSITIVE = "sensitive"  # 敏感数据
    FINANCIAL = "financial"  # 财务数据
    HEALTH = "health"  # 健康数据
    BIOMETRIC = "biometric"  # 生物识别数据
    PUBLIC = "public"  # 公开数据

@dataclass
class DataProcessingRecord:
    """数据处理记录"""
    id: str
    timestamp: datetime
    data_category: DataCategory
    processing_purpose: str
    legal_basis: str
    data_subject: str
    retention_period: int  # 保留期限(天)
    anonymized: bool
    encrypted: bool
    access_log: List[Dict]
    compliance_standards: List[ComplianceStandard]

@dataclass
class ConsentRecord:
    """同意记录"""
    id: str
    data_subject: str
    consent_type: str
    granted_at: datetime
    expires_at: Optional[datetime]
    withdrawn_at: Optional[datetime]
    purposes: List[str]
    legal_basis: str
    consent_method: str  # 同意方式
    ip_address: str
    user_agent: str

@dataclass
class DataBreachIncident:
    """数据泄露事件"""
    id: str
    detected_at: datetime
    reported_at: Optional[datetime]
    incident_type: str
    affected_records: int
    data_categories: List[DataCategory]
    severity: str  # low, medium, high, critical
    description: str
    mitigation_actions: List[str]
    notification_required: bool
    authorities_notified: bool
    subjects_notified: bool

class ComplianceManager:
    """合规性管理器"""
    
    def __init__(self, config_file: str, db_path: str = "compliance.db"):
        self.config = self.load_config(config_file)
        self.db_path = db_path
        self.init_database()
        self.compliance_rules = self.load_compliance_rules()
        
    def load_config(self, config_file: str) -> Dict:
        """加载配置文件"""
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        except Exception as e:
            logger.error(f"加载合规配置失败: {e}")
            return {}
    
    def init_database(self):
        """初始化数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 数据处理记录表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS data_processing_records (
                id TEXT PRIMARY KEY,
                timestamp TEXT NOT NULL,
                data_category TEXT NOT NULL,
                processing_purpose TEXT NOT NULL,
                legal_basis TEXT NOT NULL,
                data_subject TEXT NOT NULL,
                retention_period INTEGER NOT NULL,
                anonymized BOOLEAN NOT NULL,
                encrypted BOOLEAN NOT NULL,
                access_log TEXT,
                compliance_standards TEXT
            )
        ''')
        
        # 同意记录表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS consent_records (
                id TEXT PRIMARY KEY,
                data_subject TEXT NOT NULL,
                consent_type TEXT NOT NULL,
                granted_at TEXT NOT NULL,
                expires_at TEXT,
                withdrawn_at TEXT,
                purposes TEXT,
                legal_basis TEXT NOT NULL,
                consent_method TEXT NOT NULL,
                ip_address TEXT,
                user_agent TEXT
            )
        ''')
        
        # 数据泄露事件表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS data_breach_incidents (
                id TEXT PRIMARY KEY,
                detected_at TEXT NOT NULL,
                reported_at TEXT,
                incident_type TEXT NOT NULL,
                affected_records INTEGER NOT NULL,
                data_categories TEXT,
                severity TEXT NOT NULL,
                description TEXT,
                mitigation_actions TEXT,
                notification_required BOOLEAN NOT NULL,
                authorities_notified BOOLEAN NOT NULL,
                subjects_notified BOOLEAN NOT NULL
            )
        ''')
        
        # 审计日志表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS audit_logs (
                id TEXT PRIMARY KEY,
                timestamp TEXT NOT NULL,
                event_type TEXT NOT NULL,
                user_id TEXT,
                resource_id TEXT,
                action TEXT NOT NULL,
                result TEXT NOT NULL,
                details TEXT,
                ip_address TEXT,
                user_agent TEXT
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def load_compliance_rules(self) -> Dict:
        """加载合规规则"""
        return {
            ComplianceStandard.GDPR: {
                'data_retention_max_days': 2555,  # 7年
                'consent_required': True,
                'right_to_erasure': True,
                'data_portability': True,
                'breach_notification_hours': 72,
                'dpo_required': True,
                'privacy_by_design': True
            },
            ComplianceStandard.CCPA: {
                'data_retention_max_days': 365,  # 1年
                'opt_out_required': True,
                'data_disclosure': True,
                'data_deletion': True,
                'non_discrimination': True
            },
            ComplianceStandard.HIPAA: {
                'data_retention_max_days': 2190,  # 6年
                'encryption_required': True,
                'access_controls': True,
                'audit_logs': True,
                'breach_notification_days': 60
            },
            ComplianceStandard.PCI_DSS: {
                'data_retention_max_days': 365,
                'encryption_required': True,
                'access_controls': True,
                'vulnerability_scanning': True,
                'penetration_testing': True
            }
        }
    
    def record_data_processing(self, record: DataProcessingRecord) -> bool:
        """记录数据处理活动"""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            cursor.execute('''
                INSERT INTO data_processing_records (
                    id, timestamp, data_category, processing_purpose, legal_basis,
                    data_subject, retention_period, anonymized, encrypted,
                    access_log, compliance_standards
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                record.id,
                record.timestamp.isoformat(),
                record.data_category.value,
                record.processing_purpose,
                record.legal_basis,
                record.data_subject,
                record.retention_period,
                record.anonymized,
                record.encrypted,
                json.dumps(record.access_log),
                json.dumps([std.value for std in record.compliance_standards])
            ))
            
            conn.commit()
            conn.close()
            
            # 记录审计日志
            self.log_audit_event(
                event_type='data_processing',
                action='record_created',
                resource_id=record.id,
                details={'data_category': record.data_category.value}
            )
            
            return True
            
        except Exception as e:
            logger.error(f"记录数据处理活动失败: {e}")
            return False
    
    def record_consent(self, consent: ConsentRecord) -> bool:
        """记录用户同意"""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            cursor.execute('''
                INSERT INTO consent_records (
                    id, data_subject, consent_type, granted_at, expires_at,
                    withdrawn_at, purposes, legal_basis, consent_method,
                    ip_address, user_agent
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                consent.id,
                consent.data_subject,
                consent.consent_type,
                consent.granted_at.isoformat(),
                consent.expires_at.isoformat() if consent.expires_at else None,
                consent.withdrawn_at.isoformat() if consent.withdrawn_at else None,
                json.dumps(consent.purposes),
                consent.legal_basis,
                consent.consent_method,
                consent.ip_address,
                consent.user_agent
            ))
            
            conn.commit()
            conn.close()
            
            # 记录审计日志
            self.log_audit_event(
                event_type='consent',
                action='consent_granted',
                resource_id=consent.id,
                details={'data_subject': consent.data_subject}
            )
            
            return True
            
        except Exception as e:
            logger.error(f"记录用户同意失败: {e}")
            return False
    
    def withdraw_consent(self, consent_id: str, withdrawal_reason: str = None) -> bool:
        """撤回同意"""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            cursor.execute('''
                UPDATE consent_records 
                SET withdrawn_at = ? 
                WHERE id = ? AND withdrawn_at IS NULL
            ''', (datetime.now().isoformat(), consent_id))
            
            if cursor.rowcount > 0:
                conn.commit()
                
                # 记录审计日志
                self.log_audit_event(
                    event_type='consent',
                    action='consent_withdrawn',
                    resource_id=consent_id,
                    details={'reason': withdrawal_reason}
                )
                
                logger.info(f"同意已撤回: {consent_id}")
                conn.close()
                return True
            else:
                conn.close()
                return False
                
        except Exception as e:
            logger.error(f"撤回同意失败: {e}")
            return False
    
    def record_data_breach(self, incident: DataBreachIncident) -> bool:
        """记录数据泄露事件"""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            cursor.execute('''
                INSERT INTO data_breach_incidents (
                    id, detected_at, reported_at, incident_type, affected_records,
                    data_categories, severity, description, mitigation_actions,
                    notification_required, authorities_notified, subjects_notified
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                incident.id,
                incident.detected_at.isoformat(),
                incident.reported_at.isoformat() if incident.reported_at else None,
                incident.incident_type,
                incident.affected_records,
                json.dumps([cat.value for cat in incident.data_categories]),
                incident.severity,
                incident.description,
                json.dumps(incident.mitigation_actions),
                incident.notification_required,
                incident.authorities_notified,
                incident.subjects_notified
            ))
            
            conn.commit()
            conn.close()
            
            # 记录审计日志
            self.log_audit_event(
                event_type='data_breach',
                action='incident_recorded',
                resource_id=incident.id,
                details={
                    'severity': incident.severity,
                    'affected_records': incident.affected_records
                }
            )
            
            # 检查是否需要自动通知
            self.check_breach_notification_requirements(incident)
            
            return True
            
        except Exception as e:
            logger.error(f"记录数据泄露事件失败: {e}")
            return False
    
    def check_breach_notification_requirements(self, incident: DataBreachIncident):
        """检查泄露通知要求"""
        # GDPR要求72小时内通知监管机构
        if ComplianceStandard.GDPR in [ComplianceStandard(std) for std in self.config.get('applicable_standards', [])]:
            if incident.severity in ['high', 'critical']:
                notification_deadline = incident.detected_at + timedelta(hours=72)
                if datetime.now() < notification_deadline:
                    logger.warning(f"GDPR通知要求: 事件 {incident.id} 需要在 {notification_deadline} 前通知监管机构")
        
        # HIPAA要求60天内通知
        if ComplianceStandard.HIPAA in [ComplianceStandard(std) for std in self.config.get('applicable_standards', [])]:
            if DataCategory.HEALTH in incident.data_categories:
                notification_deadline = incident.detected_at + timedelta(days=60)
                if datetime.now() < notification_deadline:
                    logger.warning(f"HIPAA通知要求: 事件 {incident.id} 需要在 {notification_deadline} 前通知相关方")
    
    def log_audit_event(self, event_type: str, action: str, user_id: str = None, 
                       resource_id: str = None, result: str = 'success', 
                       details: Dict = None, ip_address: str = None, 
                       user_agent: str = None):
        """记录审计事件"""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            audit_id = str(uuid.uuid4())
            
            cursor.execute('''
                INSERT INTO audit_logs (
                    id, timestamp, event_type, user_id, resource_id,
                    action, result, details, ip_address, user_agent
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ''', (
                audit_id,
                datetime.now().isoformat(),
                event_type,
                user_id,
                resource_id,
                action,
                result,
                json.dumps(details) if details else None,
                ip_address,
                user_agent
            ))
            
            conn.commit()
            conn.close()
            
        except Exception as e:
            logger.error(f"记录审计事件失败: {e}")
    
    def check_data_retention_compliance(self) -> List[Dict]:
        """检查数据保留合规性"""
        violations = []
        
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            cursor.execute('''
                SELECT id, timestamp, data_category, retention_period, compliance_standards
                FROM data_processing_records
            ''')
            
            records = cursor.fetchall()
            conn.close()
            
            for record in records:
                record_id, timestamp_str, data_category, retention_period, standards_str = record
                timestamp = datetime.fromisoformat(timestamp_str)
                standards = json.loads(standards_str)
                
                # 检查是否超过保留期限
                if datetime.now() > timestamp + timedelta(days=retention_period):
                    violations.append({
                        'record_id': record_id,
                        'violation_type': 'retention_period_exceeded',
                        'data_category': data_category,
                        'retention_period': retention_period,
                        'age_days': (datetime.now() - timestamp).days,
                        'standards': standards
                    })
                
                # 检查合规标准要求
                for standard_str in standards:
                    standard = ComplianceStandard(standard_str)
                    if standard in self.compliance_rules:
                        max_retention = self.compliance_rules[standard]['data_retention_max_days']
                        if retention_period > max_retention:
                            violations.append({
                                'record_id': record_id,
                                'violation_type': 'retention_period_too_long',
                                'data_category': data_category,
                                'retention_period': retention_period,
                                'max_allowed': max_retention,
                                'standard': standard_str
                            })
            
            return violations
            
        except Exception as e:
            logger.error(f"检查数据保留合规性失败: {e}")
            return []
    
    def generate_compliance_report(self, standard: ComplianceStandard, 
                                 start_date: datetime = None, 
                                 end_date: datetime = None) -> Dict:
        """生成合规报告"""
        if start_date is None:
            start_date = datetime.now() - timedelta(days=30)
        if end_date is None:
            end_date = datetime.now()
        
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            # 数据处理统计
            cursor.execute('''
                SELECT data_category, COUNT(*) as count
                FROM data_processing_records
                WHERE timestamp BETWEEN ? AND ?
                AND compliance_standards LIKE ?
                GROUP BY data_category
            ''', (start_date.isoformat(), end_date.isoformat(), f'%{standard.value}%'))
            
            processing_stats = dict(cursor.fetchall())
            
            # 同意统计
            cursor.execute('''
                SELECT consent_type, COUNT(*) as count
                FROM consent_records
                WHERE granted_at BETWEEN ? AND ?
                GROUP BY consent_type
            ''', (start_date.isoformat(), end_date.isoformat()))
            
            consent_stats = dict(cursor.fetchall())
            
            # 撤回同意统计
            cursor.execute('''
                SELECT COUNT(*) as count
                FROM consent_records
                WHERE withdrawn_at BETWEEN ? AND ?
            ''', (start_date.isoformat(), end_date.isoformat()))
            
            withdrawn_consents = cursor.fetchone()[0]
            
            # 数据泄露事件
            cursor.execute('''
                SELECT severity, COUNT(*) as count
                FROM data_breach_incidents
                WHERE detected_at BETWEEN ? AND ?
                GROUP BY severity
            ''', (start_date.isoformat(), end_date.isoformat()))
            
            breach_stats = dict(cursor.fetchall())
            
            # 审计事件统计
            cursor.execute('''
                SELECT event_type, COUNT(*) as count
                FROM audit_logs
                WHERE timestamp BETWEEN ? AND ?
                GROUP BY event_type
            ''', (start_date.isoformat(), end_date.isoformat()))
            
            audit_stats = dict(cursor.fetchall())
            
            conn.close()
            
            # 检查合规性违规
            violations = self.check_data_retention_compliance()
            
            report = {
                'standard': standard.value,
                'report_period': {
                    'start_date': start_date.isoformat(),
                    'end_date': end_date.isoformat()
                },
                'data_processing': {
                    'total_records': sum(processing_stats.values()),
                    'by_category': processing_stats
                },
                'consent_management': {
                    'total_consents': sum(consent_stats.values()),
                    'by_type': consent_stats,
                    'withdrawn_consents': withdrawn_consents
                },
                'data_breaches': {
                    'total_incidents': sum(breach_stats.values()),
                    'by_severity': breach_stats
                },
                'audit_activity': {
                    'total_events': sum(audit_stats.values()),
                    'by_type': audit_stats
                },
                'compliance_violations': {
                    'total_violations': len(violations),
                    'violations': violations
                },
                'compliance_score': self.calculate_compliance_score(standard, violations),
                'recommendations': self.generate_compliance_recommendations(standard, violations)
            }
            
            return report
            
        except Exception as e:
            logger.error(f"生成合规报告失败: {e}")
            return {}
    
    def calculate_compliance_score(self, standard: ComplianceStandard, violations: List[Dict]) -> float:
        """计算合规分数"""
        base_score = 100.0
        
        # 根据违规数量和严重程度扣分
        for violation in violations:
            if violation['violation_type'] == 'retention_period_exceeded':
                base_score -= 5.0
            elif violation['violation_type'] == 'retention_period_too_long':
                base_score -= 3.0
        
        return max(0.0, base_score)
    
    def generate_compliance_recommendations(self, standard: ComplianceStandard, 
                                          violations: List[Dict]) -> List[str]:
        """生成合规建议"""
        recommendations = []
        
        if violations:
            recommendations.append("立即处理数据保留违规问题")
            recommendations.append("建立自动化数据清理流程")
            recommendations.append("定期审查数据保留策略")
        
        if standard == ComplianceStandard.GDPR:
            recommendations.extend([
                "确保所有数据处理都有合法依据",
                "实施数据主体权利响应流程",
                "进行数据保护影响评估"
            ])
        elif standard == ComplianceStandard.HIPAA:
            recommendations.extend([
                "加强访问控制和身份验证",
                "实施端到端加密",
                "定期进行安全风险评估"
            ])
        
        return recommendations
    
    def export_data_for_subject(self, data_subject: str) -> Dict:
        """导出数据主体的所有数据(数据可携带权)"""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            # 数据处理记录
            cursor.execute('''
                SELECT * FROM data_processing_records
                WHERE data_subject = ?
            ''', (data_subject,))
            
            processing_records = [dict(zip([col[0] for col in cursor.description], row)) 
                                for row in cursor.fetchall()]
            
            # 同意记录
            cursor.execute('''
                SELECT * FROM consent_records
                WHERE data_subject = ?
            ''', (data_subject,))
            
            consent_records = [dict(zip([col[0] for col in cursor.description], row)) 
                             for row in cursor.fetchall()]
            
            conn.close()
            
            export_data = {
                'data_subject': data_subject,
                'export_timestamp': datetime.now().isoformat(),
                'data_processing_records': processing_records,
                'consent_records': consent_records
            }
            
            # 记录审计日志
            self.log_audit_event(
                event_type='data_export',
                action='data_exported',
                resource_id=data_subject,
                details={'record_count': len(processing_records) + len(consent_records)}
            )
            
            return export_data
            
        except Exception as e:
            logger.error(f"导出数据主体数据失败: {e}")
            return {}
    
    def delete_data_for_subject(self, data_subject: str, reason: str = "data_subject_request") -> bool:
        """删除数据主体的所有数据(被遗忘权)"""
        try:
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            
            # 删除数据处理记录
            cursor.execute('DELETE FROM data_processing_records WHERE data_subject = ?', (data_subject,))
            processing_deleted = cursor.rowcount
            
            # 删除同意记录
            cursor.execute('DELETE FROM consent_records WHERE data_subject = ?', (data_subject,))
            consent_deleted = cursor.rowcount
            
            conn.commit()
            conn.close()
            
            # 记录审计日志
            self.log_audit_event(
                event_type='data_deletion',
                action='data_deleted',
                resource_id=data_subject,
                details={
                    'reason': reason,
                    'processing_records_deleted': processing_deleted,
                    'consent_records_deleted': consent_deleted
                }
            )
            
            logger.info(f"已删除数据主体 {data_subject} 的所有数据")
            return True
            
        except Exception as e:
            logger.error(f"删除数据主体数据失败: {e}")
            return False

# 使用示例
if __name__ == '__main__':
    # 创建合规管理器
    compliance_manager = ComplianceManager('compliance_config.json')
    
    # 记录数据处理活动
    processing_record = DataProcessingRecord(
        id=str(uuid.uuid4()),
        timestamp=datetime.now(),
        data_category=DataCategory.PERSONAL,
        processing_purpose="网站访问分析",
        legal_basis="合法利益",
        data_subject="user123",
        retention_period=365,
        anonymized=True,
        encrypted=True,
        access_log=[],
        compliance_standards=[ComplianceStandard.GDPR]
    )
    
    compliance_manager.record_data_processing(processing_record)
    
    # 记录用户同意
    consent_record = ConsentRecord(
        id=str(uuid.uuid4()),
        data_subject="user123",
        consent_type="analytics",
        granted_at=datetime.now(),
        expires_at=datetime.now() + timedelta(days=365),
        withdrawn_at=None,
        purposes=["网站分析", "性能优化"],
        legal_basis="同意",
        consent_method="网页表单",
        ip_address="192.168.1.100",
        user_agent="Mozilla/5.0..."
    )
    
    compliance_manager.record_consent(consent_record)
    
    # 生成GDPR合规报告
    report = compliance_manager.generate_compliance_report(ComplianceStandard.GDPR)
    print("GDPR合规报告:")
    print(json.dumps(report, indent=2, ensure_ascii=False))

5.4 企业级部署配置

5.4.1 Kubernetes部署

# k8s-deployment.yaml - Kubernetes部署配置
apiVersion: v1
kind: Namespace
metadata:
  name: goaccess-enterprise
  labels:
    name: goaccess-enterprise
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: goaccess-config
  namespace: goaccess-enterprise
data:
  goaccess.conf: |
    time-format %H:%M:%S
    date-format %d/%b/%Y
    log-format COMBINED
    
    real-time-html true
    ws-url wss://goaccess.example.com/ws
    
    persist true
    restore true
    db-path /data/goaccess.db
    
    geoip-database /usr/share/GeoIP/GeoLite2-City.mmdb
    
    html-prefs '{"theme":"bright","perPage":10}'
    html-report-title "Enterprise Web Analytics"
    
    exclude-ip 127.0.0.1
    exclude-ip 10.0.0.0/8
    exclude-ip 172.16.0.0/12
    exclude-ip 192.168.0.0/16
    
    ignore-crawlers true
    crawlers-only false
    
    enable-panel VISITORS
    enable-panel REQUESTS
    enable-panel REQUESTS_STATIC
    enable-panel NOT_FOUND
    enable-panel HOSTS
    enable-panel OS
    enable-panel BROWSERS
    enable-panel VISIT_TIMES
    enable-panel VIRTUAL_HOSTS
    enable-panel REFERRERS
    enable-panel REFERRING_SITES
    enable-panel KEYPHRASES
    enable-panel STATUS_CODES
    enable-panel REMOTE_USER
    enable-panel CACHE_STATUS
    enable-panel GEOLOCATION
    enable-panel MIME_TYPE
    enable-panel TLS_TYPE
    
  pipeline.yaml: |
    kafka:
      bootstrap_servers:
        - kafka-cluster:9092
      topics:
        input: "access-logs"
        output: "processed-logs"
        alerts: "security-alerts"
      
    redis:
      host: "redis-cluster"
      port: 6379
      db: 0
      
    elasticsearch:
      hosts:
        - "elasticsearch-cluster:9200"
      index_prefix: "goaccess"
      
    processing:
      batch_size: 1000
      flush_interval: 30
      max_retries: 3
      
    security:
      encryption_enabled: true
      anonymization_enabled: true
      rate_limit_per_minute: 10000
      
    monitoring:
      prometheus_port: 8000
      health_check_port: 8080
---
apiVersion: v1
kind: Secret
metadata:
  name: goaccess-secrets
  namespace: goaccess-enterprise
type: Opaque
data:
  # Base64编码的密钥
  encryption-key: <base64-encoded-encryption-key>
  jwt-secret: <base64-encoded-jwt-secret>
  database-password: <base64-encoded-db-password>
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: goaccess-data
  namespace: goaccess-enterprise
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 100Gi
  storageClassName: fast-ssd
---
apiVersion: v1
kind: Service
metadata:
  name: goaccess-service
  namespace: goaccess-enterprise
  labels:
    app: goaccess
spec:
  selector:
    app: goaccess
  ports:
    - name: http
      port: 80
      targetPort: 7890
    - name: websocket
      port: 7890
      targetPort: 7890
    - name: metrics
      port: 8000
      targetPort: 8000
  type: ClusterIP
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: goaccess-deployment
  namespace: goaccess-enterprise
  labels:
    app: goaccess
spec:
  replicas: 3
  selector:
    matchLabels:
      app: goaccess
  template:
    metadata:
      labels:
        app: goaccess
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "8000"
        prometheus.io/path: "/metrics"
    spec:
      containers:
      - name: goaccess
        image: goaccess/goaccess:latest
        ports:
        - containerPort: 7890
          name: http
        - containerPort: 8000
          name: metrics
        env:
        - name: GOACCESS_CONFIG_FILE
          value: "/etc/goaccess/goaccess.conf"
        - name: ENCRYPTION_KEY
          valueFrom:
            secretKeyRef:
              name: goaccess-secrets
              key: encryption-key
        - name: JWT_SECRET
          valueFrom:
            secretKeyRef:
              name: goaccess-secrets
              key: jwt-secret
        volumeMounts:
        - name: config-volume
          mountPath: /etc/goaccess
        - name: data-volume
          mountPath: /data
        - name: logs-volume
          mountPath: /var/log/nginx
          readOnly: true
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5
      - name: log-collector
        image: fluent/fluent-bit:latest
        env:
        - name: KAFKA_BROKERS
          value: "kafka-cluster:9092"
        volumeMounts:
        - name: logs-volume
          mountPath: /var/log/nginx
          readOnly: true
        - name: fluent-bit-config
          mountPath: /fluent-bit/etc
      - name: security-manager
        image: goaccess/security-manager:latest
        ports:
        - containerPort: 8080
          name: health
        env:
        - name: CONFIG_FILE
          value: "/etc/security/config.json"
        - name: ENCRYPTION_KEY
          valueFrom:
            secretKeyRef:
              name: goaccess-secrets
              key: encryption-key
        volumeMounts:
        - name: security-config
          mountPath: /etc/security
        resources:
          requests:
            memory: "256Mi"
            cpu: "100m"
          limits:
            memory: "512Mi"
            cpu: "500m"
      volumes:
      - name: config-volume
        configMap:
          name: goaccess-config
      - name: data-volume
        persistentVolumeClaim:
          claimName: goaccess-data
      - name: logs-volume
        hostPath:
          path: /var/log/nginx
          type: Directory
      - name: fluent-bit-config
        configMap:
          name: fluent-bit-config
      - name: security-config
        configMap:
          name: security-config
      nodeSelector:
        node-type: compute
      tolerations:
      - key: "compute-node"
        operator: "Equal"
        value: "true"
        effect: "NoSchedule"
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: goaccess-ingress
  namespace: goaccess-enterprise
  annotations:
    kubernetes.io/ingress.class: "nginx"
    cert-manager.io/cluster-issuer: "letsencrypt-prod"
    nginx.ingress.kubernetes.io/ssl-redirect: "true"
    nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
    nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
    nginx.ingress.kubernetes.io/websocket-services: "goaccess-service"
spec:
  tls:
  - hosts:
    - goaccess.example.com
    secretName: goaccess-tls
  rules:
  - host: goaccess.example.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: goaccess-service
            port:
              number: 80
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: goaccess-pdb
  namespace: goaccess-enterprise
spec:
  minAvailable: 2
  selector:
    matchLabels:
      app: goaccess
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: goaccess-hpa
  namespace: goaccess-enterprise
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: goaccess-deployment
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
    ua_lower = user_agent.lower()

    if 'chrome' in ua_lower:
        return 'Chrome'
    elif 'firefox' in ua_lower:
        return 'Firefox'
    elif 'safari' in ua_lower and 'chrome' not in ua_lower:
        return 'Safari'
    elif 'edge' in ua_lower:
        return 'Edge'
    elif 'bot' in ua_lower or 'crawler' in ua_lower or 'spider' in ua_lower:
        return 'Bot/Crawler'
    else:
        return 'Other'

def is_suspicious_ip(self, ip: str, timestamp: int) -> bool:
    """检查IP是否可疑"""
    # 简单的频率检查
    current_time = timestamp // 1000
    window_start = current_time - 300  # 5分钟窗口

    # 这里应该查询Redis或其他存储来获取IP的请求频率
    # 为了演示,使用简单的逻辑
    request_count = self.get_ip_request_count(ip)
    return request_count > 100  # 5分钟内超过100个请求

def get_ip_request_count(self, ip: str) -> int:
    """获取IP请求计数"""
    # 这里应该从Redis或其他存储获取实际计数
    # 为了演示,返回随机值
    import random
    return random.randint(1, 150)

def is_suspicious_path(self, path: str) -> bool:
    """检查路径是否可疑"""
    suspicious_patterns = [
        '/admin', '/wp-admin', '/.env', '/config',
        '/phpmyadmin', '/sql', '/backup', '/.git'
    ]

    return any(pattern in path.lower() for pattern in suspicious_patterns)

def process_record(self, record: StreamRecord):
    """处理单个记录"""
    try:
        # 应用所有注册的处理器
        for processor_name, processor_func in self.processors.items():
            try:
                result = processor_func(record)
                if result:
                    # 发送处理结果到输出主题
                    self.send_result(processor_name, result)
            except Exception as e:
                logger.error(f"处理器 {processor_name} 执行失败: {e}")
                self.stats['processing_errors'] += 1

        self.stats['messages_processed'] += 1

    except Exception as e:
        logger.error(f"处理记录失败: {e}")
        self.stats['processing_errors'] += 1

def send_result(self, processor_name: str, result: Dict):
    """发送处理结果"""
    try:
        output_topic = self.config['kafka'].get('output_topics', {}).get(
            processor_name, 
            f"goaccess-{processor_name}"
        )

        # 添加元数据
        enriched_result = {
            **result,
            'processor': processor_name,
            'processed_at': datetime.now().isoformat()
        }

        # 发送到Kafka
        future = self.producer.send(
            output_topic,
            value=enriched_result,
            key=f"{processor_name}:{int(time.time())}"
        )

        # 异步处理发送结果
        future.add_callback(self.on_send_success)
        future.add_errback(self.on_send_error)

    except Exception as e:
        logger.error(f"发送结果失败: {e}")

def on_send_success(self, record_metadata):
    """发送成功回调"""
    self.stats['messages_produced'] += 1

def on_send_error(self, exception):
    """发送失败回调"""
    logger.error(f"发送消息失败: {exception}")

def run(self):
    """运行流处理器"""
    self.running = True

    # 注册默认处理器
    self.register_processor('access_log_aggregation', self.process_access_log_aggregation)
    self.register_processor('real_time_alerts', self.process_real_time_alerts)
    self.register_processor('windowed_aggregation', self.process_windowed_aggregation)

    logger.info("Kafka流处理器启动")

    try:
        while self.running:
            # 批量获取消息
            message_pack = self.consumer.poll(
                timeout_ms=self.config['kafka'].get('poll_timeout_ms', 1000),
                max_records=self.config['kafka'].get('max_poll_records', 500)
            )

            if not message_pack:
                continue

            # 处理消息
            for topic_partition, messages in message_pack.items():
                for message in messages:
                    if not self.running:
                        break

                    # 创建流记录
                    stream_record = StreamRecord(
                        key=message.key,
                        value=message.value,
                        timestamp=message.timestamp,
                        partition=message.partition,
                        offset=message.offset
                    )

                    # 处理记录
                    self.process_record(stream_record)

            # 定期更新统计信息
            if int(time.time()) % 60 == 0:  # 每分钟
                self.stats['last_update'] = datetime.now()
                logger.info(f"处理统计: {self.stats}")

    except KeyboardInterrupt:
        logger.info("收到中断信号,停止流处理器")
    except Exception as e:
        logger.error(f"流处理器异常: {e}")
    finally:
        self.stop()

def stop(self):
    """停止流处理器"""
    self.running = False

    try:
        # 刷新生产者缓冲区
        self.producer.flush(timeout=10)

        # 关闭连接
        self.consumer.close()
        self.producer.close()

        logger.info("Kafka流处理器已停止")
    except Exception as e:
        logger.error(f"停止流处理器时发生错误: {e}")

def get_stats(self) -> Dict:
    """获取统计信息"""
    return {
        **self.stats,
        'registered_processors': list(self.processors.keys()),
        'active_windows': len(self.windows)
    }

def main(): import argparse

parser = argparse.ArgumentParser(description='Kafka流处理器')
parser.add_argument('--config', '-c', required=True, help='配置文件路径')
parser.add_argument('--stats', action='store_true', help='显示统计信息')

args = parser.parse_args()

# 创建流处理器
processor = KafkaStreamProcessor(args.config)

if args.stats:
    # 显示统计信息
    stats = processor.get_stats()
    print(json.dumps(stats, indent=2, ensure_ascii=False, default=str))
    return

try:
    # 运行流处理器
    processor.run()
except KeyboardInterrupt:
    print("\n正在停止流处理器...")
finally:
    processor.stop()

if name == “main”: main() “`

2.2.2 实时数据管道

”`python #!/usr/bin/env python3

real_time_pipeline.py - 实时数据管道

import json import time import logging import asyncio from typing import Dict, List, Optional, Callable, Any from dataclasses import dataclass, asdict from datetime import datetime, timedelta from concurrent.futures import ThreadPoolExecutor from collections import deque, defaultdict import aioredis import aiokafka from elasticsearch import AsyncElasticsearch import websockets import aiohttp from prometheus_client import Counter, Histogram, Gauge, start_http_server

logging.basicConfig(level=logging.INFO) logger = logging.getLogger(name)

Prometheus指标

PROCESSED_MESSAGES = Counter(‘pipeline_processed_messages_total’, ‘Total processed messages’, [‘stage’]) PROCESSING_TIME = Histogram(‘pipeline_processing_seconds’, ‘Processing time’, [‘stage’]) ACTIVE_CONNECTIONS = Gauge(‘pipeline_active_connections’, ‘Active connections’, [‘type’]) ERROR_COUNT = Counter(‘pipeline_errors_total’, ‘Total errors’, [‘stage’, ‘error_type’])

@dataclass class PipelineMessage: “”“管道消息”“” id: str timestamp: datetime source: str data: Dict metadata: Dict stage: str = ‘input’ processing_time: float = 0.0

@dataclass class PipelineStage: “”“管道阶段”“” name: str processor: Callable output_topics: List[str] error_topic: Optional[str] = None retry_count: int = 3 timeout: float = 30.0 parallel: bool = False batch_size: int = 1

class RealTimePipeline: “”“实时数据管道”“”

def __init__(self, config_file: str):
    self.config = self.load_config(config_file)
    self.stages = {}
    self.connections = {
        'kafka_consumer': None,
        'kafka_producer': None,
        'redis': None,
        'elasticsearch': None,
        'websocket_clients': set()
    }
    self.message_buffer = deque(maxlen=10000)
    self.stats = {
        'total_processed': 0,
        'total_errors': 0,
        'processing_rate': 0.0,
        'last_update': datetime.now(),
        'stage_stats': defaultdict(lambda: {'processed': 0, 'errors': 0, 'avg_time': 0.0})
    }
    self.running = False
    self.executor = ThreadPoolExecutor(max_workers=self.config.get('max_workers', 10))

def load_config(self, config_file: str) -> Dict:
    """加载配置文件"""
    try:
        with open(config_file, 'r', encoding='utf-8') as f:
            return json.load(f)
    except Exception as e:
        logger.error(f"加载配置文件失败: {e}")
        raise

async def initialize_connections(self):
    """初始化连接"""
    try:
        # 初始化Kafka连接
        await self.initialize_kafka()

        # 初始化Redis连接
        await self.initialize_redis()

        # 初始化Elasticsearch连接
        await self.initialize_elasticsearch()

        logger.info("所有连接初始化完成")

    except Exception as e:
        logger.error(f"初始化连接失败: {e}")
        raise

async def initialize_kafka(self):
    """初始化Kafka连接"""
    kafka_config = self.config.get('kafka', {})

    if kafka_config.get('enabled', False):
        # 初始化消费者
        self.connections['kafka_consumer'] = aiokafka.AIOKafkaConsumer(
            *kafka_config['input_topics'],
            bootstrap_servers=kafka_config['bootstrap_servers'],
            group_id=kafka_config['consumer_group'],
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            key_deserializer=lambda m: m.decode('utf-8') if m else None,
            auto_offset_reset=kafka_config.get('auto_offset_reset', 'latest'),
            enable_auto_commit=kafka_config.get('enable_auto_commit', True)
        )

        # 初始化生产者
        self.connections['kafka_producer'] = aiokafka.AIOKafkaProducer(
            bootstrap_servers=kafka_config['bootstrap_servers'],
            value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            compression_type=kafka_config.get('compression_type', 'gzip')
        )

        # 启动连接
        await self.connections['kafka_consumer'].start()
        await self.connections['kafka_producer'].start()

        ACTIVE_CONNECTIONS.labels(type='kafka').set(1)
        logger.info("Kafka连接初始化成功")

async def initialize_redis(self):
    """初始化Redis连接"""
    redis_config = self.config.get('redis', {})

    if redis_config.get('enabled', False):
        self.connections['redis'] = await aioredis.create_redis_pool(
            f"redis://{redis_config['host']}:{redis_config['port']}",
            db=redis_config.get('db', 0),
            password=redis_config.get('password'),
            encoding='utf-8'
        )

        ACTIVE_CONNECTIONS.labels(type='redis').set(1)
        logger.info("Redis连接初始化成功")

async def initialize_elasticsearch(self):
    """初始化Elasticsearch连接"""
    es_config = self.config.get('elasticsearch', {})

    if es_config.get('enabled', False):
        self.connections['elasticsearch'] = AsyncElasticsearch(
            hosts=es_config['hosts'],
            timeout=es_config.get('timeout', 30)
        )

        # 测试连接
        if await self.connections['elasticsearch'].ping():
            ACTIVE_CONNECTIONS.labels(type='elasticsearch').set(1)
            logger.info("Elasticsearch连接初始化成功")
        else:
            logger.error("Elasticsearch连接测试失败")

def register_stage(self, stage: PipelineStage):
    """注册管道阶段"""
    self.stages[stage.name] = stage
    logger.info(f"注册管道阶段: {stage.name}")

async def process_message(self, message: PipelineMessage, stage_name: str) -> List[PipelineMessage]:
    """处理消息"""
    stage = self.stages.get(stage_name)
    if not stage:
        logger.error(f"未找到阶段: {stage_name}")
        return []

    start_time = time.time()

    try:
        # 更新消息阶段
        message.stage = stage_name

        # 执行处理器
        if asyncio.iscoroutinefunction(stage.processor):
            result = await asyncio.wait_for(
                stage.processor(message),
                timeout=stage.timeout
            )
        else:
            # 在线程池中执行同步函数
            result = await asyncio.get_event_loop().run_in_executor(
                self.executor,
                stage.processor,
                message
            )

        # 处理结果
        if result is None:
            return []
        elif isinstance(result, list):
            output_messages = result
        else:
            output_messages = [result]

        # 更新处理时间
        processing_time = time.time() - start_time
        for msg in output_messages:
            if isinstance(msg, PipelineMessage):
                msg.processing_time += processing_time

        # 更新统计信息
        PROCESSED_MESSAGES.labels(stage=stage_name).inc()
        PROCESSING_TIME.labels(stage=stage_name).observe(processing_time)

        self.stats['stage_stats'][stage_name]['processed'] += 1
        self.stats['stage_stats'][stage_name]['avg_time'] = (
            (self.stats['stage_stats'][stage_name]['avg_time'] + processing_time) / 2
        )

        return output_messages

    except asyncio.TimeoutError:
        logger.error(f"阶段 {stage_name} 处理超时")
        ERROR_COUNT.labels(stage=stage_name, error_type='timeout').inc()
        self.stats['stage_stats'][stage_name]['errors'] += 1
        return []
    except Exception as e:
        logger.error(f"阶段 {stage_name} 处理失败: {e}")
        ERROR_COUNT.labels(stage=stage_name, error_type='processing').inc()
        self.stats['stage_stats'][stage_name]['errors'] += 1

        # 发送到错误主题
        if stage.error_topic:
            await self.send_to_error_topic(message, stage.error_topic, str(e))

        return []

async def send_to_error_topic(self, message: PipelineMessage, error_topic: str, error_msg: str):
    """发送到错误主题"""
    try:
        error_data = {
            'original_message': asdict(message),
            'error': error_msg,
            'timestamp': datetime.now().isoformat(),
            'stage': message.stage
        }

        if self.connections['kafka_producer']:
            await self.connections['kafka_producer'].send(
                error_topic,
                value=error_data,
                key=message.id
            )
    except Exception as e:
        logger.error(f"发送错误消息失败: {e}")

async def send_to_outputs(self, messages: List[PipelineMessage], output_topics: List[str]):
    """发送到输出主题"""
    for message in messages:
        for topic in output_topics:
            try:
                await self.send_message(message, topic)
            except Exception as e:
                logger.error(f"发送消息到 {topic} 失败: {e}")

async def send_message(self, message: PipelineMessage, topic: str):
    """发送消息"""
    message_data = asdict(message)

    # 发送到Kafka
    if self.connections['kafka_producer'] and topic.startswith('kafka:'):
        kafka_topic = topic.replace('kafka:', '')
        await self.connections['kafka_producer'].send(
            kafka_topic,
            value=message_data,
            key=message.id
        )

    # 发送到Redis
    elif self.connections['redis'] and topic.startswith('redis:'):
        redis_key = topic.replace('redis:', '')
        await self.connections['redis'].lpush(
            redis_key,
            json.dumps(message_data, ensure_ascii=False)
        )
        await self.connections['redis'].ltrim(redis_key, 0, 9999)

    # 发送到Elasticsearch
    elif self.connections['elasticsearch'] and topic.startswith('elasticsearch:'):
        index_name = topic.replace('elasticsearch:', '')
        await self.connections['elasticsearch'].index(
            index=index_name,
            id=message.id,
            body=message_data
        )

    # 发送到WebSocket客户端
    elif topic == 'websocket':
        await self.broadcast_to_websockets(message_data)

    # 发送到HTTP端点
    elif topic.startswith('http:'):
        url = topic.replace('http:', '')
        await self.send_to_http_endpoint(message_data, url)

async def broadcast_to_websockets(self, data: Dict):
    """广播到WebSocket客户端"""
    if not self.connections['websocket_clients']:
        return

    message = json.dumps(data, ensure_ascii=False)
    disconnected_clients = set()

    for client in self.connections['websocket_clients']:
        try:
            await client.send(message)
        except websockets.exceptions.ConnectionClosed:
            disconnected_clients.add(client)
        except Exception as e:
            logger.error(f"WebSocket发送失败: {e}")
            disconnected_clients.add(client)

    # 移除断开的客户端
    self.connections['websocket_clients'] -= disconnected_clients
    ACTIVE_CONNECTIONS.labels(type='websocket').set(len(self.connections['websocket_clients']))

async def send_to_http_endpoint(self, data: Dict, url: str):
    """发送到HTTP端点"""
    try:
        async with aiohttp.ClientSession() as session:
            async with session.post(
                url,
                json=data,
                timeout=aiohttp.ClientTimeout(total=10)
            ) as response:
                if response.status >= 400:
                    logger.error(f"HTTP发送失败: {response.status}")
    except Exception as e:
        logger.error(f"HTTP发送异常: {e}")

async def run_pipeline(self):
    """运行管道"""
    self.running = True

    # 注册默认处理阶段
    await self.register_default_stages()

    logger.info("实时数据管道启动")

    try:
        # 启动消息消费循环
        await self.consume_messages()
    except Exception as e:
        logger.error(f"管道运行异常: {e}")
    finally:
        await self.cleanup()

async def register_default_stages(self):
    """注册默认处理阶段"""
    # 数据清洗阶段
    self.register_stage(PipelineStage(
        name='data_cleaning',
        processor=self.clean_data,
        output_topics=['kafka:cleaned_data'],
        error_topic='kafka:cleaning_errors'
    ))

    # 数据增强阶段
    self.register_stage(PipelineStage(
        name='data_enrichment',
        processor=self.enrich_data,
        output_topics=['kafka:enriched_data'],
        error_topic='kafka:enrichment_errors'
    ))

    # 实时分析阶段
    self.register_stage(PipelineStage(
        name='real_time_analysis',
        processor=self.analyze_data,
        output_topics=['elasticsearch:goaccess-analysis', 'websocket'],
        error_topic='kafka:analysis_errors'
    ))

    # 告警检测阶段
    self.register_stage(PipelineStage(
        name='alert_detection',
        processor=self.detect_alerts,
        output_topics=['kafka:alerts', 'http:http://alert-manager:9093/api/v1/alerts'],
        error_topic='kafka:alert_errors'
    ))

async def clean_data(self, message: PipelineMessage) -> Optional[PipelineMessage]:
    """数据清洗"""
    try:
        data = message.data.copy()

        # 清理空值
        cleaned_data = {k: v for k, v in data.items() if v is not None and v != ''}

        # 标准化IP地址
        if 'ip' in cleaned_data:
            ip = cleaned_data['ip']
            if ip.startswith('::ffff:'):
                cleaned_data['ip'] = ip.replace('::ffff:', '')

        # 标准化时间戳
        if 'timestamp' in cleaned_data:
            try:
                if isinstance(cleaned_data['timestamp'], str):
                    cleaned_data['timestamp'] = datetime.fromisoformat(
                        cleaned_data['timestamp'].replace('Z', '+00:00')
                    ).isoformat()
            except:
                cleaned_data['timestamp'] = datetime.now().isoformat()

        # 清理用户代理字符串
        if 'user_agent' in cleaned_data:
            ua = cleaned_data['user_agent']
            if len(ua) > 500:  # 截断过长的用户代理
                cleaned_data['user_agent'] = ua[:500] + '...'

        # 验证必需字段
        required_fields = ['ip', 'timestamp', 'path']
        if not all(field in cleaned_data for field in required_fields):
            logger.warning(f"消息缺少必需字段: {message.id}")
            return None

        return PipelineMessage(
            id=message.id,
            timestamp=message.timestamp,
            source=message.source,
            data=cleaned_data,
            metadata={**message.metadata, 'cleaned': True},
            stage='data_cleaning'
        )

    except Exception as e:
        logger.error(f"数据清洗失败: {e}")
        return None

async def enrich_data(self, message: PipelineMessage) -> Optional[PipelineMessage]:
    """数据增强"""
    try:
        data = message.data.copy()
        enriched_metadata = message.metadata.copy()

        # 地理位置增强
        if 'ip' in data:
            geo_info = await self.get_geo_info(data['ip'])
            if geo_info:
                data['geo'] = geo_info

        # 用户代理解析
        if 'user_agent' in data:
            ua_info = self.parse_user_agent(data['user_agent'])
            if ua_info:
                data['browser'] = ua_info

        # 路径分类
        if 'path' in data:
            path_category = self.categorize_path(data['path'])
            data['path_category'] = path_category

        # 添加处理时间戳
        enriched_metadata['enriched_at'] = datetime.now().isoformat()

        return PipelineMessage(
            id=message.id,
            timestamp=message.timestamp,
            source=message.source,
            data=data,
            metadata=enriched_metadata,
            stage='data_enrichment'
        )

    except Exception as e:
        logger.error(f"数据增强失败: {e}")
        return None

async def get_geo_info(self, ip: str) -> Optional[Dict]:
    """获取地理位置信息"""
    try:
        # 这里可以集成GeoIP数据库或API
        # 为了演示,返回模拟数据
        if ip.startswith('192.168.') or ip.startswith('10.') or ip.startswith('172.'):
            return {
                'country': 'Private',
                'city': 'Local',
                'latitude': 0.0,
                'longitude': 0.0
            }

        # 模拟外部IP的地理信息
        return {
            'country': 'Unknown',
            'city': 'Unknown',
            'latitude': 0.0,
            'longitude': 0.0
        }
    except Exception as e:
        logger.error(f"获取地理位置信息失败: {e}")
        return None

def parse_user_agent(self, user_agent: str) -> Optional[Dict]:
    """解析用户代理"""
    try:
        # 简单的用户代理解析
        ua_lower = user_agent.lower()

        browser = 'Unknown'
        os = 'Unknown'
        device_type = 'Desktop'

        # 浏览器检测
        if 'chrome' in ua_lower:
            browser = 'Chrome'
        elif 'firefox' in ua_lower:
            browser = 'Firefox'
        elif 'safari' in ua_lower and 'chrome' not in ua_lower:
            browser = 'Safari'
        elif 'edge' in ua_lower:
            browser = 'Edge'
        elif 'bot' in ua_lower or 'crawler' in ua_lower:
            browser = 'Bot'
            device_type = 'Bot'

        # 操作系统检测
        if 'windows' in ua_lower:
            os = 'Windows'
        elif 'mac' in ua_lower or 'darwin' in ua_lower:
            os = 'macOS'
        elif 'linux' in ua_lower:
            os = 'Linux'
        elif 'android' in ua_lower:
            os = 'Android'
            device_type = 'Mobile'
        elif 'ios' in ua_lower or 'iphone' in ua_lower or 'ipad' in ua_lower:
            os = 'iOS'
            device_type = 'Mobile' if 'iphone' in ua_lower else 'Tablet'

        return {
            'browser': browser,
            'os': os,
            'device_type': device_type
        }

    except Exception as e:
        logger.error(f"解析用户代理失败: {e}")
        return None

def categorize_path(self, path: str) -> str:
    """路径分类"""
    try:
        path_lower = path.lower()

        if path_lower.startswith('/api/'):
            return 'API'
        elif path_lower.startswith('/admin/'):
            return 'Admin'
        elif path_lower.startswith('/static/') or path_lower.startswith('/assets/'):
            return 'Static'
        elif path_lower.endswith(('.css', '.js', '.png', '.jpg', '.jpeg', '.gif', '.ico')):
            return 'Asset'
        elif path_lower.endswith(('.html', '.htm', '.php', '.asp', '.jsp')):
            return 'Page'
        else:
            return 'Other'

    except Exception as e:
        logger.error(f"路径分类失败: {e}")
        return 'Unknown'