目录

微服务架构集成

1. 微服务日志聚合架构

graph TB
    subgraph "微服务集群"
        A[API Gateway] --> B[User Service]
        A --> C[Order Service]
        A --> D[Payment Service]
        A --> E[Notification Service]
        
        B --> F[User DB]
        C --> G[Order DB]
        D --> H[Payment DB]
    end
    
    subgraph "日志收集层"
        I[Filebeat Agent]
        J[Metricbeat Agent]
        K[APM Agent]
    end
    
    subgraph "消息队列"
        L[Kafka/RabbitMQ]
    end
    
    subgraph "ELK Stack"
        M[Logstash Cluster]
        N[Elasticsearch Cluster]
        O[Kibana]
    end
    
    B --> I
    C --> I
    D --> I
    E --> I
    
    B --> J
    C --> J
    D --> J
    E --> J
    
    B --> K
    C --> K
    D --> K
    E --> K
    
    I --> L
    J --> L
    K --> L
    
    L --> M
    M --> N
    N --> O

2. 分布式追踪集成

Jaeger集成配置:

# docker-compose.yml
version: '3.8'
services:
  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686"
      - "14268:14268"
    environment:
      - COLLECTOR_OTLP_ENABLED=true
      - SPAN_STORAGE_TYPE=elasticsearch
      - ES_SERVER_URLS=http://elasticsearch:9200
    depends_on:
      - elasticsearch

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
    ports:
      - "9200:9200"

  kibana:
    image: docker.elastic.co/kibana/kibana:8.11.0
    ports:
      - "5601:5601"
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    depends_on:
      - elasticsearch

  logstash:
    image: docker.elastic.co/logstash/logstash:8.11.0
    volumes:
      - ./logstash/pipeline:/usr/share/logstash/pipeline
      - ./logstash/config:/usr/share/logstash/config
    ports:
      - "5044:5044"
    depends_on:
      - elasticsearch

Spring Boot微服务集成:

// pom.xml
<dependencies>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-api</artifactId>
    </dependency>
    <dependency>
        <groupId>io.opentelemetry</groupId>
        <artifactId>opentelemetry-exporter-jaeger</artifactId>
    </dependency>
    <dependency>
        <groupId>net.logstash.logback</groupId>
        <artifactId>logstash-logback-encoder</artifactId>
        <version>7.4</version>
    </dependency>
</dependencies>

// TracingConfiguration.java
@Configuration
public class TracingConfiguration {
    
    @Bean
    public OpenTelemetry openTelemetry() {
        return OpenTelemetrySdk.builder()
            .setTracerProvider(
                SdkTracerProvider.builder()
                    .addSpanProcessor(BatchSpanProcessor.builder(
                        JaegerGrpcSpanExporter.builder()
                            .setEndpoint("http://jaeger:14250")
                            .build())
                        .build())
                    .setResource(Resource.getDefault()
                        .merge(Resource.create(
                            Attributes.of(ResourceAttributes.SERVICE_NAME, "user-service"))))
                    .build())
            .build();
    }
}

// UserController.java
@RestController
@RequestMapping("/api/users")
public class UserController {
    
    private static final Logger logger = LoggerFactory.getLogger(UserController.class);
    private final Tracer tracer;
    
    public UserController(OpenTelemetry openTelemetry) {
        this.tracer = openTelemetry.getTracer("user-service");
    }
    
    @GetMapping("/{id}")
    public ResponseEntity<User> getUser(@PathVariable Long id) {
        Span span = tracer.spanBuilder("get-user")
            .setAttribute("user.id", id)
            .startSpan();
        
        try (Scope scope = span.makeCurrent()) {
            logger.info("Getting user with id: {}", id);
            
            // 业务逻辑
            User user = userService.findById(id);
            
            span.setStatus(StatusCode.OK);
            return ResponseEntity.ok(user);
            
        } catch (Exception e) {
            span.setStatus(StatusCode.ERROR, e.getMessage());
            logger.error("Error getting user: {}", e.getMessage(), e);
            throw e;
        } finally {
            span.end();
        }
    }
}

Logback配置:

<!-- logback-spring.xml -->
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>
    
    <springProfile name="!local">
        <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
            <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
                <providers>
                    <timestamp/>
                    <logLevel/>
                    <loggerName/>
                    <message/>
                    <mdc/>
                    <arguments/>
                    <stackTrace/>
                    <pattern>
                        <pattern>
                            {
                                "service": "user-service",
                                "version": "${app.version:-unknown}",
                                "environment": "${spring.profiles.active:-unknown}",
                                "trace_id": "%X{traceId:-}",
                                "span_id": "%X{spanId:-}"
                            }
                        </pattern>
                    </pattern>
                </providers>
            </encoder>
        </appender>
    </springProfile>
    
    <springProfile name="local">
        <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level [%X{traceId:-},%X{spanId:-}] %logger{36} - %msg%n</pattern>
            </encoder>
        </appender>
    </springProfile>
    
    <root level="INFO">
        <appender-ref ref="STDOUT"/>
    </root>
</configuration>

3. 服务网格集成

Istio + ELK集成:

# istio-telemetry.yaml
apiVersion: telemetry.istio.io/v1alpha1
kind: Telemetry
metadata:
  name: default
  namespace: istio-system
spec:
  accessLogging:
  - providers:
    - name: otel
  - providers:
    - name: elasticsearch
  metrics:
  - providers:
    - name: prometheus
  tracing:
  - providers:
    - name: jaeger

---
apiVersion: install.istio.io/v1alpha1
kind: IstioOperator
metadata:
  name: control-plane
spec:
  meshConfig:
    extensionProviders:
    - name: elasticsearch
      envoyFileAccessLog:
        service: elasticsearch.logging.svc.cluster.local
        port: 9200
        logFormat:
          labels:
            custom_label: "istio-access-log"
    - name: jaeger
      zipkin:
        service: jaeger-collector.istio-system.svc.cluster.local
        port: 9411

Envoy访问日志配置:

{
  "access_log": [
    {
      "name": "envoy.access_loggers.http_grpc",
      "typed_config": {
        "@type": "type.googleapis.com/envoy.extensions.access_loggers.grpc.v3.HttpGrpcAccessLogConfig",
        "common_config": {
          "log_name": "istio-access-log",
          "grpc_service": {
            "envoy_grpc": {
              "cluster_name": "logstash-cluster"
            }
          }
        },
        "additional_request_headers_to_log": [
          "x-request-id",
          "x-trace-id",
          "user-agent"
        ],
        "additional_response_headers_to_log": [
          "x-response-time"
        ]
      }
    }
  ]
}

云平台部署

1. AWS部署架构

# aws-elk-stack.yaml
AWSTemplateFormatVersion: '2010-09-09'
Description: 'ELK Stack on AWS'

Parameters:
  VpcId:
    Type: AWS::EC2::VPC::Id
    Description: VPC ID for ELK deployment
  
  SubnetIds:
    Type: List<AWS::EC2::Subnet::Id>
    Description: Subnet IDs for ELK deployment
  
  InstanceType:
    Type: String
    Default: t3.large
    Description: EC2 instance type

Resources:
  # Elasticsearch Service
  ElasticsearchDomain:
    Type: AWS::Elasticsearch::Domain
    Properties:
      DomainName: !Sub '${AWS::StackName}-elasticsearch'
      ElasticsearchVersion: '7.10'
      ElasticsearchClusterConfig:
        InstanceType: !Ref InstanceType
        InstanceCount: 3
        DedicatedMasterEnabled: true
        MasterInstanceType: t3.small
        MasterInstanceCount: 3
      EBSOptions:
        EBSEnabled: true
        VolumeType: gp3
        VolumeSize: 100
      VPCOptions:
        SubnetIds: !Ref SubnetIds
        SecurityGroupIds:
          - !Ref ElasticsearchSecurityGroup
      AccessPolicies:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              AWS: '*'
            Action: 'es:*'
            Resource: !Sub 'arn:aws:es:${AWS::Region}:${AWS::AccountId}:domain/${AWS::StackName}-elasticsearch/*'
      EncryptionAtRestOptions:
        Enabled: true
      NodeToNodeEncryptionOptions:
        Enabled: true
      DomainEndpointOptions:
        EnforceHTTPS: true

  # Kibana ALB
  KibanaLoadBalancer:
    Type: AWS::ElasticLoadBalancingV2::LoadBalancer
    Properties:
      Name: !Sub '${AWS::StackName}-kibana-alb'
      Type: application
      Scheme: internet-facing
      Subnets: !Ref SubnetIds
      SecurityGroups:
        - !Ref KibanaSecurityGroup

  # Logstash Auto Scaling Group
  LogstashLaunchTemplate:
    Type: AWS::EC2::LaunchTemplate
    Properties:
      LaunchTemplateName: !Sub '${AWS::StackName}-logstash'
      LaunchTemplateData:
        ImageId: ami-0abcdef1234567890  # Amazon Linux 2
        InstanceType: !Ref InstanceType
        SecurityGroupIds:
          - !Ref LogstashSecurityGroup
        UserData:
          Fn::Base64: !Sub |
            #!/bin/bash
            yum update -y
            yum install -y docker
            service docker start
            usermod -a -G docker ec2-user
            
            # Install Logstash
            docker run -d \
              --name logstash \
              --restart unless-stopped \
              -p 5044:5044 \
              -e ELASTICSEARCH_HOSTS=${ElasticsearchDomain.DomainEndpoint} \
              docker.elastic.co/logstash/logstash:8.11.0

  LogstashAutoScalingGroup:
    Type: AWS::AutoScaling::AutoScalingGroup
    Properties:
      AutoScalingGroupName: !Sub '${AWS::StackName}-logstash-asg'
      LaunchTemplate:
        LaunchTemplateId: !Ref LogstashLaunchTemplate
        Version: !GetAtt LogstashLaunchTemplate.LatestVersionNumber
      MinSize: 2
      MaxSize: 10
      DesiredCapacity: 3
      VPCZoneIdentifier: !Ref SubnetIds
      TargetGroupARNs:
        - !Ref LogstashTargetGroup

  # Security Groups
  ElasticsearchSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: Security group for Elasticsearch
      VpcId: !Ref VpcId
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 443
          ToPort: 443
          SourceSecurityGroupId: !Ref KibanaSecurityGroup
        - IpProtocol: tcp
          FromPort: 443
          ToPort: 443
          SourceSecurityGroupId: !Ref LogstashSecurityGroup

  KibanaSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: Security group for Kibana
      VpcId: !Ref VpcId
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 80
          ToPort: 80
          CidrIp: 0.0.0.0/0
        - IpProtocol: tcp
          FromPort: 443
          ToPort: 443
          CidrIp: 0.0.0.0/0

  LogstashSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: Security group for Logstash
      VpcId: !Ref VpcId
      SecurityGroupIngress:
        - IpProtocol: tcp
          FromPort: 5044
          ToPort: 5044
          CidrIp: 10.0.0.0/8

Outputs:
  ElasticsearchEndpoint:
    Description: Elasticsearch domain endpoint
    Value: !GetAtt ElasticsearchDomain.DomainEndpoint
    Export:
      Name: !Sub '${AWS::StackName}-elasticsearch-endpoint'
  
  KibanaURL:
    Description: Kibana URL
    Value: !Sub 'https://${ElasticsearchDomain.DomainEndpoint}/_plugin/kibana/'
    Export:
      Name: !Sub '${AWS::StackName}-kibana-url'

2. Kubernetes部署

Helm Chart配置:

# values.yaml
elasticsearch:
  enabled: true
  replicas: 3
  minimumMasterNodes: 2
  
  esConfig:
    elasticsearch.yml: |
      cluster.name: "elasticsearch"
      network.host: 0.0.0.0
      discovery.seed_hosts: "elasticsearch-master-headless"
      cluster.initial_master_nodes: "elasticsearch-master-0,elasticsearch-master-1,elasticsearch-master-2"
      
      # Security
      xpack.security.enabled: true
      xpack.security.transport.ssl.enabled: true
      xpack.security.transport.ssl.verification_mode: certificate
      xpack.security.transport.ssl.keystore.path: /usr/share/elasticsearch/config/certs/elastic-certificates.p12
      xpack.security.transport.ssl.truststore.path: /usr/share/elasticsearch/config/certs/elastic-certificates.p12
      
      # Monitoring
      xpack.monitoring.collection.enabled: true
  
  resources:
    requests:
      cpu: "1000m"
      memory: "2Gi"
    limits:
      cpu: "2000m"
      memory: "4Gi"
  
  volumeClaimTemplate:
    accessModes: ["ReadWriteOnce"]
    storageClassName: "fast-ssd"
    resources:
      requests:
        storage: 100Gi

kibana:
  enabled: true
  replicas: 2
  
  kibanaConfig:
    kibana.yml: |
      server.host: 0.0.0.0
      elasticsearch.hosts: ["https://elasticsearch-master:9200"]
      elasticsearch.username: "kibana_system"
      elasticsearch.password: "${ELASTICSEARCH_PASSWORD}"
      
      # Security
      xpack.security.enabled: true
      xpack.security.encryptionKey: "${KIBANA_ENCRYPTION_KEY}"
      
      # Monitoring
      xpack.monitoring.ui.container.elasticsearch.enabled: true
  
  resources:
    requests:
      cpu: "500m"
      memory: "1Gi"
    limits:
      cpu: "1000m"
      memory: "2Gi"
  
  service:
    type: LoadBalancer
    port: 5601

logstash:
  enabled: true
  replicas: 3
  
  logstashConfig:
    logstash.yml: |
      http.host: 0.0.0.0
      xpack.monitoring.elasticsearch.hosts: ["https://elasticsearch-master:9200"]
      xpack.monitoring.elasticsearch.username: "logstash_system"
      xpack.monitoring.elasticsearch.password: "${ELASTICSEARCH_PASSWORD}"
  
  logstashPipeline:
    logstash.conf: |
      input {
        beats {
          port => 5044
        }
        http {
          port => 8080
        }
      }
      
      filter {
        if [fields][log_type] == "application" {
          grok {
            match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}" }
          }
          date {
            match => [ "timestamp", "ISO8601" ]
          }
        }
      }
      
      output {
        elasticsearch {
          hosts => ["https://elasticsearch-master:9200"]
          user => "logstash_writer"
          password => "${ELASTICSEARCH_PASSWORD}"
          index => "logs-%{+YYYY.MM.dd}"
        }
      }
  
  resources:
    requests:
      cpu: "500m"
      memory: "1Gi"
    limits:
      cpu: "1000m"
      memory: "2Gi"
  
  service:
    type: ClusterIP
    ports:
      - name: beats
        port: 5044
        protocol: TCP
      - name: http
        port: 8080
        protocol: TCP

filebeat:
  enabled: true
  daemonset:
    enabled: true
  
  filebeatConfig:
    filebeat.yml: |
      filebeat.inputs:
      - type: container
        paths:
          - /var/log/containers/*.log
        processors:
        - add_kubernetes_metadata:
            host: ${NODE_NAME}
            matchers:
            - logs_path:
                logs_path: "/var/log/containers/"
      
      output.logstash:
        hosts: ["logstash:5044"]
      
      setup.kibana:
        host: "kibana:5601"
  
  resources:
    requests:
      cpu: "100m"
      memory: "100Mi"
    limits:
      cpu: "200m"
      memory: "200Mi"

metricbeat:
  enabled: true
  daemonset:
    enabled: true
  
  metricbeatConfig:
    metricbeat.yml: |
      metricbeat.modules:
      - module: system
        metricsets:
          - cpu
          - load
          - memory
          - network
          - process
          - process_summary
        enabled: true
        period: 10s
        processes: ['.*']
      
      - module: kubernetes
        metricsets:
          - node
          - system
          - pod
          - container
          - volume
        enabled: true
        period: 10s
        host: ${NODE_NAME}
        hosts: ["https://${NODE_NAME}:10250"]
      
      output.elasticsearch:
        hosts: ["https://elasticsearch-master:9200"]
        username: "metricbeat_writer"
        password: "${ELASTICSEARCH_PASSWORD}"
        index: "metricbeat-%{+yyyy.MM.dd}"
  
  resources:
    requests:
      cpu: "100m"
      memory: "100Mi"
    limits:
      cpu: "200m"
      memory: "200Mi"

部署脚本:

#!/bin/bash
# deploy-elk-k8s.sh

set -e

NAMESPACE="elk-stack"
RELEASE_NAME="elk"
CHART_VERSION="8.11.0"

# 创建命名空间
kubectl create namespace $NAMESPACE --dry-run=client -o yaml | kubectl apply -f -

# 创建密钥
kubectl create secret generic elasticsearch-credentials \
  --from-literal=username=elastic \
  --from-literal=password=$(openssl rand -base64 32) \
  --namespace=$NAMESPACE

kubectl create secret generic kibana-encryption-key \
  --from-literal=encryptionkey=$(openssl rand -base64 32) \
  --namespace=$NAMESPACE

# 添加Elastic Helm仓库
helm repo add elastic https://helm.elastic.co
helm repo update

# 部署Elasticsearch
helm upgrade --install elasticsearch elastic/elasticsearch \
  --version=$CHART_VERSION \
  --namespace=$NAMESPACE \
  --values=elasticsearch-values.yaml \
  --wait

# 部署Kibana
helm upgrade --install kibana elastic/kibana \
  --version=$CHART_VERSION \
  --namespace=$NAMESPACE \
  --values=kibana-values.yaml \
  --wait

# 部署Logstash
helm upgrade --install logstash elastic/logstash \
  --version=$CHART_VERSION \
  --namespace=$NAMESPACE \
  --values=logstash-values.yaml \
  --wait

# 部署Filebeat
helm upgrade --install filebeat elastic/filebeat \
  --version=$CHART_VERSION \
  --namespace=$NAMESPACE \
  --values=filebeat-values.yaml \
  --wait

# 部署Metricbeat
helm upgrade --install metricbeat elastic/metricbeat \
  --version=$CHART_VERSION \
  --namespace=$NAMESPACE \
  --values=metricbeat-values.yaml \
  --wait

echo "ELK Stack deployed successfully!"
echo "Kibana URL: http://$(kubectl get svc kibana-kibana -n $NAMESPACE -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):5601"
echo "Elasticsearch URL: http://$(kubectl get svc elasticsearch-master -n $NAMESPACE -o jsonpath='{.spec.clusterIP}'):9200"

第三方工具集成

1. Prometheus集成

Prometheus配置:

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - "elk_rules.yml"

scrape_configs:
  - job_name: 'elasticsearch'
    static_configs:
      - targets: ['elasticsearch:9200']
    metrics_path: /_prometheus/metrics
    scrape_interval: 30s
  
  - job_name: 'kibana'
    static_configs:
      - targets: ['kibana:5601']
    metrics_path: /api/status
    scrape_interval: 30s
  
  - job_name: 'logstash'
    static_configs:
      - targets: ['logstash:9600']
    metrics_path: /_node/stats
    scrape_interval: 30s

remote_write:
  - url: "http://elasticsearch:9200/_prometheus/api/v1/write"
    queue_config:
      max_samples_per_send: 1000
      max_shards: 200
      capacity: 2500

Elasticsearch Exporter:

# elasticsearch-exporter.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: elasticsearch-exporter
  namespace: monitoring
spec:
  replicas: 1
  selector:
    matchLabels:
      app: elasticsearch-exporter
  template:
    metadata:
      labels:
        app: elasticsearch-exporter
    spec:
      containers:
      - name: elasticsearch-exporter
        image: quay.io/prometheuscommunity/elasticsearch-exporter:latest
        args:
          - '--es.uri=http://elasticsearch:9200'
          - '--es.all'
          - '--es.indices'
          - '--es.indices_settings'
          - '--es.shards'
          - '--es.snapshots'
          - '--es.timeout=30s'
        ports:
        - containerPort: 9114
          name: http
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 200m
            memory: 256Mi
---
apiVersion: v1
kind: Service
metadata:
  name: elasticsearch-exporter
  namespace: monitoring
  labels:
    app: elasticsearch-exporter
spec:
  ports:
  - port: 9114
    targetPort: 9114
    name: http
  selector:
    app: elasticsearch-exporter

2. Grafana集成

Grafana数据源配置:

{
  "datasources": [
    {
      "name": "Elasticsearch",
      "type": "elasticsearch",
      "access": "proxy",
      "url": "http://elasticsearch:9200",
      "database": "logs-*",
      "basicAuth": false,
      "isDefault": false,
      "jsonData": {
        "timeField": "@timestamp",
        "esVersion": "8.0.0",
        "maxConcurrentShardRequests": 5,
        "logMessageField": "message",
        "logLevelField": "level"
      }
    },
    {
      "name": "Prometheus",
      "type": "prometheus",
      "access": "proxy",
      "url": "http://prometheus:9090",
      "isDefault": true,
      "jsonData": {
        "timeInterval": "15s"
      }
    }
  ]
}

ELK监控仪表板:

{
  "dashboard": {
    "title": "ELK Stack Monitoring",
    "tags": ["elk", "monitoring"],
    "timezone": "browser",
    "panels": [
      {
        "title": "Elasticsearch Cluster Health",
        "type": "stat",
        "targets": [
          {
            "expr": "elasticsearch_cluster_health_status",
            "legendFormat": "Cluster Status"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "mappings": [
              {"options": {"0": {"text": "Red", "color": "red"}}},
              {"options": {"1": {"text": "Yellow", "color": "yellow"}}},
              {"options": {"2": {"text": "Green", "color": "green"}}}
            ]
          }
        }
      },
      {
        "title": "Elasticsearch Nodes",
        "type": "stat",
        "targets": [
          {
            "expr": "elasticsearch_cluster_health_number_of_nodes",
            "legendFormat": "Total Nodes"
          },
          {
            "expr": "elasticsearch_cluster_health_number_of_data_nodes",
            "legendFormat": "Data Nodes"
          }
        ]
      },
      {
        "title": "Indexing Rate",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(elasticsearch_indices_indexing_index_total[5m])",
            "legendFormat": "{{node}}"
          }
        ]
      },
      {
        "title": "Search Rate",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(elasticsearch_indices_search_query_total[5m])",
            "legendFormat": "{{node}}"
          }
        ]
      },
      {
        "title": "JVM Heap Usage",
        "type": "graph",
        "targets": [
          {
            "expr": "elasticsearch_jvm_memory_used_bytes{area=\"heap\"} / elasticsearch_jvm_memory_max_bytes{area=\"heap\"} * 100",
            "legendFormat": "{{node}}"
          }
        ],
        "yAxes": [
          {
            "unit": "percent",
            "max": 100
          }
        ]
      },
      {
        "title": "Disk Usage",
        "type": "graph",
        "targets": [
          {
            "expr": "elasticsearch_filesystem_data_used_bytes / elasticsearch_filesystem_data_size_bytes * 100",
            "legendFormat": "{{node}}"
          }
        ]
      }
    ]
  }
}

3. Slack集成

Slack通知配置:

{
  "webhook_url": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK",
  "channel": "#elk-alerts",
  "username": "ELK-Bot",
  "icon_emoji": ":warning:",
  "templates": {
    "error_alert": {
      "color": "danger",
      "title": "ELK Stack Alert",
      "fields": [
        {
          "title": "Alert Type",
          "value": "{{alert_type}}",
          "short": true
        },
        {
          "title": "Severity",
          "value": "{{severity}}",
          "short": true
        },
        {
          "title": "Message",
          "value": "{{message}}",
          "short": false
        },
        {
          "title": "Timestamp",
          "value": "{{timestamp}}",
          "short": true
        }
      ]
    }
  }
}

Logstash Slack输出插件:

# logstash-slack-output.conf
filter {
  if [level] == "ERROR" or [level] == "FATAL" {
    mutate {
      add_tag => [ "alert" ]
    }
  }
}

output {
  if "alert" in [tags] {
    slack {
      url => "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
      channel => "#alerts"
      username => "Logstash"
      icon_emoji => ":exclamation:"
      format => "Error detected: %{message} from %{host} at %{@timestamp}"
    }
  }
  
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "logs-%{+YYYY.MM.dd}"
  }
}

机器学习应用

1. 异常检测

ML Job配置:

{
  "job_id": "log_anomaly_detection",
  "description": "检测日志中的异常模式",
  "analysis_config": {
    "bucket_span": "15m",
    "detectors": [
      {
        "detector_description": "count by level",
        "function": "count",
        "by_field_name": "level.keyword"
      },
      {
        "detector_description": "rare message",
        "function": "rare",
        "by_field_name": "message.keyword"
      },
      {
        "detector_description": "high response time",
        "function": "high_mean",
        "field_name": "response_time",
        "over_field_name": "service.keyword"
      }
    ],
    "influencers": [
      "level.keyword",
      "service.keyword",
      "host.name"
    ]
  },
  "data_description": {
    "time_field": "@timestamp",
    "time_format": "epoch_ms"
  },
  "model_plot_config": {
    "enabled": true,
    "terms": "level.keyword,service.keyword"
  },
  "analysis_limits": {
    "model_memory_limit": "256mb",
    "categorization_examples_limit": 4
  },
  "datafeed_config": {
    "datafeed_id": "datafeed-log_anomaly_detection",
    "indices": ["logs-*"],
    "query": {
      "bool": {
        "must": [
          {
            "range": {
              "@timestamp": {
                "gte": "now-30d"
              }
            }
          }
        ]
      }
    },
    "scroll_size": 1000,
    "frequency": "150s",
    "query_delay": "60s"
  }
}

异常检测API使用:

# ml_anomaly_detection.py
import requests
import json
from datetime import datetime, timedelta

class ElasticsearchMLClient:
    def __init__(self, host, username, password):
        self.host = host
        self.auth = (username, password)
        self.headers = {'Content-Type': 'application/json'}
    
    def create_ml_job(self, job_config):
        """创建ML作业"""
        url = f"{self.host}/_ml/anomaly_detectors/{job_config['job_id']}"
        response = requests.put(url, 
                              json=job_config, 
                              auth=self.auth, 
                              headers=self.headers)
        return response.json()
    
    def start_datafeed(self, datafeed_id, start_time=None):
        """启动数据馈送"""
        url = f"{self.host}/_ml/datafeeds/{datafeed_id}/_start"
        params = {}
        if start_time:
            params['start'] = start_time
        
        response = requests.post(url, 
                               params=params,
                               auth=self.auth, 
                               headers=self.headers)
        return response.json()
    
    def get_anomalies(self, job_id, start_time, end_time, anomaly_score_threshold=50):
        """获取异常检测结果"""
        url = f"{self.host}/_ml/anomaly_detectors/{job_id}/results/records"
        query = {
            "query": {
                "bool": {
                    "must": [
                        {
                            "range": {
                                "timestamp": {
                                    "gte": start_time,
                                    "lte": end_time
                                }
                            }
                        },
                        {
                            "range": {
                                "record_score": {
                                    "gte": anomaly_score_threshold
                                }
                            }
                        }
                    ]
                }
            },
            "sort": [
                {
                    "record_score": {
                        "order": "desc"
                    }
                }
            ]
        }
        
        response = requests.post(url, 
                               json=query,
                               auth=self.auth, 
                               headers=self.headers)
        return response.json()
    
    def send_alert(self, anomaly):
        """发送异常告警"""
        alert_message = {
            "timestamp": anomaly['timestamp'],
            "job_id": anomaly['job_id'],
            "record_score": anomaly['record_score'],
            "typical": anomaly.get('typical', []),
            "actual": anomaly.get('actual', []),
            "field_name": anomaly.get('field_name', ''),
            "by_field_value": anomaly.get('by_field_value', '')
        }
        
        # 发送到Slack或其他告警系统
        slack_webhook = "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
        slack_message = {
            "text": f"异常检测告警: {anomaly.get('field_name', '')} 异常分数: {anomaly['record_score']}",
            "attachments": [
                {
                    "color": "danger",
                    "fields": [
                        {
                            "title": "作业ID",
                            "value": anomaly['job_id'],
                            "short": True
                        },
                        {
                            "title": "时间",
                            "value": anomaly['timestamp'],
                            "short": True
                        },
                        {
                            "title": "异常分数",
                            "value": str(anomaly['record_score']),
                            "short": True
                        }
                    ]
                }
            ]
        }
        
        requests.post(slack_webhook, json=slack_message)

# 使用示例
if __name__ == "__main__":
    client = ElasticsearchMLClient(
        host="http://localhost:9200",
        username="elastic",
        password="your_password"
    )
    
    # 获取最近1小时的异常
    end_time = datetime.now()
    start_time = end_time - timedelta(hours=1)
    
    anomalies = client.get_anomalies(
        job_id="log_anomaly_detection",
        start_time=start_time.isoformat(),
        end_time=end_time.isoformat(),
        anomaly_score_threshold=75
    )
    
    # 处理异常并发送告警
    for anomaly in anomalies.get('records', []):
        if anomaly['record_score'] > 90:
            client.send_alert(anomaly)

2. 日志分类

文本分类模型:

{
  "job_id": "log_classification",
  "description": "日志消息分类",
  "analysis_config": {
    "categorization_field_name": "message",
    "bucket_span": "1h",
    "detectors": [
      {
        "detector_description": "categorization",
        "function": "count",
        "by_field_name": "mlcategory"
      }
    ],
    "categorization_filters": [
      "\\b\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\b",
      "\\b\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}",
      "\\b[A-Fa-f0-9]{8}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{4}-[A-Fa-f0-9]{12}\\b"
    ]
  },
  "data_description": {
    "time_field": "@timestamp"
  },
  "analysis_limits": {
    "model_memory_limit": "512mb",
    "categorization_examples_limit": 10
  }
}

安全分析

1. SIEM集成

安全事件检测规则:

{
  "rules": [
    {
      "name": "Brute Force Attack Detection",
      "description": "检测暴力破解攻击",
      "query": {
        "bool": {
          "must": [
            {
              "term": {
                "event.action": "login"
              }
            },
            {
              "term": {
                "event.outcome": "failure"
              }
            }
          ],
          "filter": [
            {
              "range": {
                "@timestamp": {
                  "gte": "now-5m"
                }
              }
            }
          ]
        }
      },
      "aggregation": {
        "terms": {
          "field": "source.ip",
          "min_doc_count": 10
        }
      },
      "threshold": 10,
      "severity": "high",
      "actions": [
        "block_ip",
        "send_alert"
      ]
    },
    {
      "name": "Privilege Escalation",
      "description": "检测权限提升",
      "query": {
        "bool": {
          "should": [
            {
              "match": {
                "message": "sudo"
              }
            },
            {
              "match": {
                "message": "su -"
              }
            },
            {
              "match": {
                "process.name": "sudo"
              }
            }
          ],
          "minimum_should_match": 1
        }
      },
      "severity": "medium",
      "actions": [
        "log_event",
        "send_notification"
      ]
    },
    {
      "name": "Suspicious File Access",
      "description": "检测可疑文件访问",
      "query": {
        "bool": {
          "must": [
            {
              "term": {
                "event.action": "file_access"
              }
            },
            {
              "bool": {
                "should": [
                  {
                    "wildcard": {
                      "file.path": "/etc/passwd*"
                    }
                  },
                  {
                    "wildcard": {
                      "file.path": "/etc/shadow*"
                    }
                  },
                  {
                    "wildcard": {
                      "file.path": "/root/.ssh/*"
                    }
                  }
                ]
              }
            }
          ]
        }
      },
      "severity": "critical",
      "actions": [
        "immediate_alert",
        "block_user"
      ]
    }
  ]
}

安全事件响应自动化:

# security_response.py
import requests
import json
from datetime import datetime

class SecurityEventHandler:
    def __init__(self, elasticsearch_host, firewall_api, notification_api):
        self.es_host = elasticsearch_host
        self.firewall_api = firewall_api
        self.notification_api = notification_api
    
    def handle_brute_force_attack(self, source_ip, event_count):
        """处理暴力破解攻击"""
        # 1. 记录安全事件
        security_event = {
            "@timestamp": datetime.utcnow().isoformat(),
            "event": {
                "type": "security",
                "action": "brute_force_detected",
                "severity": "high"
            },
            "source": {
                "ip": source_ip
            },
            "threat": {
                "technique": "T1110",  # MITRE ATT&CK
                "tactic": "Credential Access"
            },
            "event_count": event_count,
            "response_actions": []
        }
        
        # 2. 阻止IP地址
        if event_count > 20:
            block_result = self.block_ip(source_ip)
            security_event["response_actions"].append({
                "action": "ip_blocked",
                "result": block_result
            })
        
        # 3. 发送告警
        alert_result = self.send_security_alert(
            title="Brute Force Attack Detected",
            message=f"IP {source_ip} attempted {event_count} failed logins",
            severity="high"
        )
        security_event["response_actions"].append({
            "action": "alert_sent",
            "result": alert_result
        })
        
        # 4. 记录到Elasticsearch
        self.log_security_event(security_event)
        
        return security_event
    
    def block_ip(self, ip_address):
        """阻止IP地址"""
        try:
            response = requests.post(
                f"{self.firewall_api}/block",
                json={"ip": ip_address, "duration": "1h"},
                timeout=10
            )
            return {"success": True, "response": response.json()}
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    def send_security_alert(self, title, message, severity):
        """发送安全告警"""
        try:
            alert_data = {
                "title": title,
                "message": message,
                "severity": severity,
                "timestamp": datetime.utcnow().isoformat(),
                "source": "ELK-SIEM"
            }
            
            response = requests.post(
                f"{self.notification_api}/alert",
                json=alert_data,
                timeout=10
            )
            return {"success": True, "response": response.json()}
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    def log_security_event(self, event):
        """记录安全事件到Elasticsearch"""
        try:
            response = requests.post(
                f"{self.es_host}/security-events/_doc",
                json=event,
                headers={"Content-Type": "application/json"}
            )
            return response.json()
        except Exception as e:
            print(f"Failed to log security event: {e}")
            return None

2. 威胁情报集成

威胁情报丰富化:

# logstash-threat-intel.conf
filter {
  # IP地址威胁情报查询
  if [source][ip] {
    translate {
      source => "[source][ip]"
      target => "[threat][reputation]"
      dictionary_path => "/etc/logstash/threat_intel/ip_reputation.yml"
      fallback => "unknown"
    }
    
    # 查询外部威胁情报API
    http {
      url => "https://api.threatintel.com/v1/ip/%{[source][ip]}"
      headers => {
        "Authorization" => "Bearer YOUR_API_KEY"
      }
      target_body => "[threat][intel]"
      verb => "GET"
    }
  }
  
  # 域名威胁情报查询
  if [url][domain] {
    translate {
      source => "[url][domain]"
      target => "[threat][domain_reputation]"
      dictionary_path => "/etc/logstash/threat_intel/domain_reputation.yml"
      fallback => "unknown"
    }
  }
  
  # 文件哈希威胁情报查询
  if [file][hash][sha256] {
    http {
      url => "https://api.virustotal.com/api/v3/files/%{[file][hash][sha256]}"
      headers => {
        "x-apikey" => "YOUR_VIRUSTOTAL_API_KEY"
      }
      target_body => "[threat][virustotal]"
      verb => "GET"
    }
  }
  
  # 威胁评分计算
  ruby {
    code => "
      threat_score = 0
      
      # IP威胁评分
      if event.get('[threat][reputation]') == 'malicious'
        threat_score += 50
      elsif event.get('[threat][reputation]') == 'suspicious'
        threat_score += 25
      end
      
      # 域名威胁评分
      if event.get('[threat][domain_reputation]') == 'malicious'
        threat_score += 30
      end
      
      # VirusTotal评分
      vt_data = event.get('[threat][virustotal]')
      if vt_data && vt_data['data'] && vt_data['data']['attributes']
        malicious_count = vt_data['data']['attributes']['last_analysis_stats']['malicious']
        if malicious_count && malicious_count > 0
          threat_score += [malicious_count * 5, 40].min
        end
      end
      
      event.set('[threat][score]', threat_score)
      
      # 威胁等级
      if threat_score >= 80
        event.set('[threat][level]', 'critical')
      elsif threat_score >= 60
        event.set('[threat][level]', 'high')
      elsif threat_score >= 40
        event.set('[threat][level]', 'medium')
      elsif threat_score >= 20
        event.set('[threat][level]', 'low')
      else
        event.set('[threat][level]', 'info')
      end
    "
  }
}

output {
  # 高威胁事件立即告警
  if [threat][level] == "critical" or [threat][level] == "high" {
    http {
      url => "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
      http_method => "post"
      format => "json"
      mapping => {
        "text" => "🚨 High Threat Event Detected"
        "attachments" => [{
          "color" => "danger"
          "fields" => [{
            "title" => "Threat Level"
            "value" => "%{[threat][level]}"
            "short" => true
          }, {
            "title" => "Threat Score"
            "value" => "%{[threat][score]}"
            "short" => true
          }, {
            "title" => "Source IP"
            "value" => "%{[source][ip]}"
            "short" => true
          }, {
            "title" => "Timestamp"
            "value" => "%{@timestamp}"
            "short" => true
          }]
        }]
      }
    }
  }
  
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "security-events-%{+YYYY.MM.dd}"
  }
}

APM性能监控

1. APM Agent集成

Java APM Agent配置:

# elastic-apm-agent.properties
service_name=user-service
service_version=1.0.0
environment=production
server_urls=http://apm-server:8200
secret_token=your_secret_token

# 应用性能配置
transaction_sample_rate=1.0
capture_body=all
capture_headers=true
stack_trace_limit=50
span_frames_min_duration=5ms

# 日志集成
log_correlation_enabled=true
log_ecs_reformatting=true

# 指标配置
metrics_interval=30s
disable_metrics=system.cpu.total.norm.pct,system.memory.usage,system.memory.actual.free

# 分布式追踪
distributed_tracing_enabled=true
trace_methods=com.example.service.*

# 错误配置
capture_exceptions=true
ignore_exceptions=java.lang.IllegalArgumentException

Spring Boot集成:

// pom.xml
<dependency>
    <groupId>co.elastic.apm</groupId>
    <artifactId>apm-agent-attach</artifactId>
    <version>1.34.1</version>
</dependency>

// Application.java
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        // 程序化启动APM Agent
        ElasticApmAttacher.attach();
        SpringApplication.run(Application.class, args);
    }
}

// APMConfiguration.java
@Configuration
public class APMConfiguration {
    
    @Bean
    public ElasticApm elasticApm() {
        return ElasticApm.get();
    }
    
    @EventListener
    public void handleApplicationReady(ApplicationReadyEvent event) {
        // 设置全局标签
        ElasticApm.currentTransaction()
            .setLabel("service.version", "1.0.0")
            .setLabel("deployment.environment", "production");
    }
}

// CustomMetrics.java
@Component
public class CustomMetrics {
    
    private final Timer.Sample sample;
    private final Counter errorCounter;
    
    public CustomMetrics() {
        this.errorCounter = ElasticApm.get()
            .getMetricRegistry()
            .counter("custom.errors.total");
    }
    
    @EventListener
    public void handleError(ErrorEvent event) {
        errorCounter.increment();
        
        // 添加自定义span
        Span span = ElasticApm.currentSpan()
            .createSpan()
            .setName("error.processing")
            .setType("custom");
        
        try {
            span.setLabel("error.type", event.getErrorType());
            span.setLabel("error.message", event.getMessage());
        } finally {
            span.end();
        }
    }
}

// TransactionInterceptor.java
@Component
public class TransactionInterceptor implements HandlerInterceptor {
    
    @Override
    public boolean preHandle(HttpServletRequest request, 
                           HttpServletResponse response, 
                           Object handler) {
        Transaction transaction = ElasticApm.startTransaction();
        transaction.setName(request.getMethod() + " " + request.getRequestURI());
        transaction.setType(Transaction.TYPE_REQUEST);
        
        // 添加用户信息
        String userId = request.getHeader("X-User-ID");
        if (userId != null) {
            transaction.setUser(userId, null, null, null);
        }
        
        return true;
    }
    
    @Override
    public void afterCompletion(HttpServletRequest request, 
                              HttpServletResponse response, 
                              Object handler, Exception ex) {
        Transaction transaction = ElasticApm.currentTransaction();
        if (transaction != null) {
            transaction.setResult(String.valueOf(response.getStatus()));
            if (ex != null) {
                transaction.captureException(ex);
            }
            transaction.end();
        }
    }
}

2. APM Server配置

APM Server配置文件:

# apm-server.yml
apm-server:
  host: "0.0.0.0:8200"
  
  # 认证配置
  auth:
    secret_token: "your_secret_token"
    api_key:
      enabled: true
      limit: 100
  
  # 数据处理配置
  max_request_size: 1048576
  max_header_size: 1048576
  idle_timeout: 45s
  read_timeout: 30s
  write_timeout: 30s
  shutdown_timeout: 5s
  
  # 采样配置
  sampling:
    keep_unsampled: false
    
  # RUM配置
  rum:
    enabled: true
    event_rate:
      limit: 300
      lru_size: 1000
    allow_origins: ["*"]
    allow_headers: ["*"]
    library_pattern: "node_modules|bower_components|~"
    exclude_from_grouping: "^/webpack"
    
  # 数据流配置
  data_streams:
    enabled: true
    namespace: "default"

# Elasticsearch输出配置
output.elasticsearch:
  hosts: ["elasticsearch:9200"]
  username: "apm_system"
  password: "${ELASTICSEARCH_PASSWORD}"
  
  # 索引配置
  indices:
    - index: "apm-%{[observer.version]}-sourcemap"
      when.contains:
        processor.event: "sourcemap"
    - index: "apm-%{[observer.version]}-error-%{+yyyy.MM.dd}"
      when.contains:
        processor.event: "error"
    - index: "apm-%{[observer.version]}-transaction-%{+yyyy.MM.dd}"
      when.contains:
        processor.event: "transaction"
    - index: "apm-%{[observer.version]}-span-%{+yyyy.MM.dd}"
      when.contains:
        processor.event: "span"
    - index: "apm-%{[observer.version]}-metric-%{+yyyy.MM.dd}"
      when.contains:
        processor.event: "metric"
  
  # 模板配置
  template:
    enabled: true
    pattern: "apm-*"
    settings:
      index:
        number_of_shards: 1
        number_of_replicas: 1
        refresh_interval: "5s"

# 监控配置
monitoring:
  enabled: true
  elasticsearch:
    hosts: ["elasticsearch:9200"]
    username: "apm_system"
    password: "${ELASTICSEARCH_PASSWORD}"

# 日志配置
logging:
  level: info
  to_files: true
  files:
    path: /var/log/apm-server
    name: apm-server
    keepfiles: 7
    permissions: 0644

3. 性能监控仪表板

Kibana APM仪表板配置:

{
  "version": "8.11.0",
  "objects": [
    {
      "id": "apm-service-overview",
      "type": "dashboard",
      "attributes": {
        "title": "APM Service Overview",
        "hits": 0,
        "description": "应用性能监控概览",
        "panelsJSON": "[\n  {\n    \"version\": \"8.11.0\",\n    \"gridData\": {\n      \"x\": 0,\n      \"y\": 0,\n      \"w\": 24,\n      \"h\": 15,\n      \"i\": \"1\"\n    },\n    \"panelIndex\": \"1\",\n    \"embeddableConfig\": {},\n    \"panelRefName\": \"panel_1\"\n  }\n]",
        "timeRestore": false,
        "timeTo": "now",
        "timeFrom": "now-24h",
        "refreshInterval": {
          "pause": false,
          "value": 10000
        },
        "kibanaSavedObjectMeta": {
          "searchSourceJSON": "{\"query\":{\"query\":\"\",\"language\":\"kuery\"},\"filter\":[]}"
        }
      },
      "references": [
        {
          "name": "panel_1",
          "type": "visualization",
          "id": "apm-transaction-overview"
        }
      ]
    },
    {
      "id": "apm-transaction-overview",
      "type": "visualization",
      "attributes": {
        "title": "Transaction Overview",
        "visState": "{\"title\":\"Transaction Overview\",\"type\":\"line\",\"params\":{\"grid\":{\"categoryLines\":false,\"style\":{\"color\":\"#eee\"}},\"categoryAxes\":[{\"id\":\"CategoryAxis-1\",\"type\":\"category\",\"position\":\"bottom\",\"show\":true,\"style\":{},\"scale\":{\"type\":\"linear\"},\"labels\":{\"show\":true,\"truncate\":100},\"title\":{}}],\"valueAxes\":[{\"id\":\"ValueAxis-1\",\"name\":\"LeftAxis-1\",\"type\":\"value\",\"position\":\"left\",\"show\":true,\"style\":{},\"scale\":{\"type\":\"linear\",\"mode\":\"normal\"},\"labels\":{\"show\":true,\"rotate\":0,\"filter\":false,\"truncate\":100},\"title\":{\"text\":\"Count\"}}],\"seriesParams\":[{\"show\":true,\"type\":\"line\",\"mode\":\"normal\",\"data\":{\"label\":\"Count\",\"id\":\"1\"},\"valueAxis\":\"ValueAxis-1\",\"drawLinesBetweenPoints\":true,\"showCircles\":true}],\"addTooltip\":true,\"addLegend\":true,\"legendPosition\":\"right\",\"times\":[],\"addTimeMarker\":false},\"aggs\":[{\"id\":\"1\",\"enabled\":true,\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"enabled\":true,\"type\":\"date_histogram\",\"schema\":\"segment\",\"params\":{\"field\":\"@timestamp\",\"interval\":\"auto\",\"customInterval\":\"2h\",\"min_doc_count\":1,\"extended_bounds\":{}}}]}",
        "uiStateJSON": "{}",
        "description": "",
        "version": 1,
        "kibanaSavedObjectMeta": {
          "searchSourceJSON": "{\"index\":\"apm-*-transaction-*\",\"query\":{\"match_all\":{}},\"filter\":[]}"
        }
      }
    }
  ]
}

数据管道优化

1. 数据流优化

Logstash管道优化配置:

# logstash.yml
node.name: logstash-node-1
path.data: /var/lib/logstash
path.config: /etc/logstash/conf.d
path.logs: /var/log/logstash

# 管道配置
pipeline:
  workers: 4
  batch:
    size: 1000
    delay: 50
  
# 队列配置
queue:
  type: persisted
  max_bytes: 1gb
  checkpoint.writes: 1024
  
# JVM配置
config:
  reload:
    automatic: true
    interval: 3s
  
# 监控配置
monitoring:
  enabled: true
  elasticsearch:
    hosts: ["http://elasticsearch:9200"]
    username: "logstash_system"
    password: "${LOGSTASH_SYSTEM_PASSWORD}"

# 死信队列配置
dead_letter_queue:
  enable: true
  max_bytes: 1024mb
  
# 性能调优
log:
  level: warn
  
api:
  http:
    host: 0.0.0.0
    port: 9600

多管道配置:

# pipelines.yml
- pipeline.id: web-logs
  path.config: "/etc/logstash/conf.d/web-logs.conf"
  pipeline.workers: 2
  pipeline.batch.size: 500
  queue.type: persisted
  queue.max_bytes: 512mb
  
- pipeline.id: app-logs
  path.config: "/etc/logstash/conf.d/app-logs.conf"
  pipeline.workers: 3
  pipeline.batch.size: 1000
  queue.type: persisted
  queue.max_bytes: 1gb
  
- pipeline.id: security-logs
  path.config: "/etc/logstash/conf.d/security-logs.conf"
  pipeline.workers: 1
  pipeline.batch.size: 100
  queue.type: memory
  
- pipeline.id: metrics
  path.config: "/etc/logstash/conf.d/metrics.conf"
  pipeline.workers: 2
  pipeline.batch.size: 2000
  queue.type: persisted
  queue.max_bytes: 2gb

2. 缓存优化

Redis缓存集成:

# logstash-redis-cache.conf
filter {
  # IP地理位置缓存
  if [source][ip] {
    redis {
      host => "redis"
      port => 6379
      db => 0
      key => "geoip:%{[source][ip]}"
      data_type => "get"
      target => "[source][geo_cache]"
    }
    
    if ![source][geo_cache] {
      geoip {
        source => "[source][ip]"
        target => "[source][geo]"
      }
      
      # 缓存地理位置信息
      redis {
        host => "redis"
        port => 6379
        db => 0
        key => "geoip:%{[source][ip]}"
        data_type => "set"
        value => "%{[source][geo]}"
        ttl => 86400  # 24小时过期
      }
    } else {
      mutate {
        add_field => { "[source][geo]" => "%{[source][geo_cache]}" }
      }
    }
  }
  
  # 用户代理缓存
  if [user_agent][original] {
    redis {
      host => "redis"
      port => 6379
      db => 1
      key => "useragent:%{[user_agent][original]}"
      data_type => "get"
      target => "[user_agent][cache]"
    }
    
    if ![user_agent][cache] {
      useragent {
        source => "[user_agent][original]"
        target => "[user_agent]"
      }
      
      redis {
        host => "redis"
        port => 6379
        db => 1
        key => "useragent:%{[user_agent][original]}"
        data_type => "set"
        value => "%{[user_agent]}"
        ttl => 604800  # 7天过期
      }
    } else {
      mutate {
        add_field => { "[user_agent]" => "%{[user_agent][cache]}" }
      }
    }
  }
}

3. 数据压缩与归档

索引生命周期管理:

{
  "policy": {
    "phases": {
      "hot": {
        "min_age": "0ms",
        "actions": {
          "rollover": {
            "max_size": "10gb",
            "max_age": "1d",
            "max_docs": 10000000
          },
          "set_priority": {
            "priority": 100
          }
        }
      },
      "warm": {
        "min_age": "1d",
        "actions": {
          "set_priority": {
            "priority": 50
          },
          "allocate": {
            "number_of_replicas": 0,
            "include": {},
            "exclude": {},
            "require": {
              "data_tier": "warm"
            }
          },
          "forcemerge": {
            "max_num_segments": 1
          }
        }
      },
      "cold": {
        "min_age": "7d",
        "actions": {
          "set_priority": {
            "priority": 0
          },
          "allocate": {
            "number_of_replicas": 0,
            "include": {},
            "exclude": {},
            "require": {
              "data_tier": "cold"
            }
          }
        }
      },
      "frozen": {
        "min_age": "30d",
        "actions": {
          "searchable_snapshot": {
            "snapshot_repository": "s3-repository"
          }
        }
      },
      "delete": {
        "min_age": "365d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

企业级解决方案

1. 高可用架构

多数据中心部署:

# elasticsearch-cluster.yml
cluster:
  name: production-elk-cluster
  initial_master_nodes:
    - es-master-dc1-1
    - es-master-dc1-2
    - es-master-dc2-1
    - es-master-dc2-2
    - es-master-dc3-1
  
  routing:
    allocation:
      awareness:
        attributes: rack_id,zone
      same_shard:
        host: true
  
  remote:
    connect: true
    
node:
  name: ${NODE_NAME}
  roles: ["master", "data", "ingest"]
  attr:
    rack_id: ${RACK_ID}
    zone: ${ZONE}
    
network:
  host: 0.0.0.0
  publish_host: ${PUBLISH_HOST}
  
discovery:
  seed_hosts:
    - es-master-dc1-1:9300
    - es-master-dc1-2:9300
    - es-master-dc2-1:9300
    - es-master-dc2-2:9300
    - es-master-dc3-1:9300
  
  zen:
    minimum_master_nodes: 3
    
gateway:
  expected_nodes: 9
  expected_master_nodes: 5
  expected_data_nodes: 6
  recover_after_time: 5m
  
path:
  data: ["/data1/elasticsearch", "/data2/elasticsearch"]
  logs: /var/log/elasticsearch
  
bootstrap:
  memory_lock: true
  
xpack:
  security:
    enabled: true
    transport:
      ssl:
        enabled: true
        verification_mode: certificate
        keystore:
          path: certs/elastic-certificates.p12
        truststore:
          path: certs/elastic-certificates.p12
    http:
      ssl:
        enabled: true
        keystore:
          path: certs/elastic-certificates.p12
  
  monitoring:
    collection:
      enabled: true
      
  ml:
    enabled: true
    
  watcher:
    enabled: true

2. 灾难恢复

备份恢复策略:

#!/bin/bash
# backup-elk-cluster.sh

set -e

BACKUP_DATE=$(date +%Y%m%d_%H%M%S)
BACKUP_REPO="s3-backup-repo"
SNAPSHOT_NAME="elk-backup-${BACKUP_DATE}"
ES_HOST="https://elasticsearch:9200"
ES_USER="elastic"
ES_PASS="${ELASTICSEARCH_PASSWORD}"

# 创建快照仓库(如果不存在)
create_repository() {
    echo "Creating snapshot repository..."
    curl -X PUT "${ES_HOST}/_snapshot/${BACKUP_REPO}" \
        -u "${ES_USER}:${ES_PASS}" \
        -H "Content-Type: application/json" \
        -d '{
            "type": "s3",
            "settings": {
                "bucket": "elk-backups",
                "region": "us-west-2",
                "base_path": "elasticsearch-snapshots",
                "compress": true,
                "server_side_encryption": true
            }
        }'
}

# 创建快照
create_snapshot() {
    echo "Creating snapshot: ${SNAPSHOT_NAME}"
    curl -X PUT "${ES_HOST}/_snapshot/${BACKUP_REPO}/${SNAPSHOT_NAME}" \
        -u "${ES_USER}:${ES_PASS}" \
        -H "Content-Type: application/json" \
        -d '{
            "indices": "*",
            "ignore_unavailable": true,
            "include_global_state": true,
            "metadata": {
                "taken_by": "backup-script",
                "taken_because": "scheduled backup"
            }
        }'
}

# 检查快照状态
check_snapshot_status() {
    echo "Checking snapshot status..."
    while true; do
        STATUS=$(curl -s "${ES_HOST}/_snapshot/${BACKUP_REPO}/${SNAPSHOT_NAME}" \
                     -u "${ES_USER}:${ES_PASS}" | \
                 jq -r '.snapshots[0].state')
        
        if [ "$STATUS" = "SUCCESS" ]; then
            echo "Snapshot completed successfully"
            break
        elif [ "$STATUS" = "FAILED" ]; then
            echo "Snapshot failed"
            exit 1
        else
            echo "Snapshot in progress... Status: $STATUS"
            sleep 30
        fi
    done
}

# 清理旧快照
cleanup_old_snapshots() {
    echo "Cleaning up old snapshots..."
    CUTOFF_DATE=$(date -d "30 days ago" +%Y%m%d)
    
    curl -s "${ES_HOST}/_snapshot/${BACKUP_REPO}/_all" \
        -u "${ES_USER}:${ES_PASS}" | \
    jq -r '.snapshots[] | select(.start_time_in_millis < ('$(date -d "$CUTOFF_DATE" +%s)' * 1000)) | .snapshot' | \
    while read -r old_snapshot; do
        echo "Deleting old snapshot: $old_snapshot"
        curl -X DELETE "${ES_HOST}/_snapshot/${BACKUP_REPO}/${old_snapshot}" \
            -u "${ES_USER}:${ES_PASS}"
    done
}

# 备份Kibana配置
backup_kibana_config() {
    echo "Backing up Kibana configuration..."
    curl -X POST "${ES_HOST}/.kibana/_search" \
        -u "${ES_USER}:${ES_PASS}" \
        -H "Content-Type: application/json" \
        -d '{
            "query": {"match_all": {}},
            "size": 10000
        }' > "/backup/kibana-config-${BACKUP_DATE}.json"
}

# 主执行流程
main() {
    echo "Starting ELK backup process..."
    
    create_repository
    create_snapshot
    check_snapshot_status
    cleanup_old_snapshots
    backup_kibana_config
    
    echo "Backup process completed successfully"
    
    # 发送通知
    curl -X POST "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK" \
        -H "Content-Type: application/json" \
        -d "{
            \"text\": \"✅ ELK Cluster backup completed successfully\",
            \"attachments\": [{
                \"color\": \"good\",
                \"fields\": [{
                    \"title\": \"Snapshot Name\",
                    \"value\": \"${SNAPSHOT_NAME}\",
                    \"short\": true
                }, {
                    \"title\": \"Backup Date\",
                    \"value\": \"${BACKUP_DATE}\",
                    \"short\": true
                }]
            }]
        }"
fi

main "$@"

3. 性能监控与告警

综合监控脚本:

# elk_monitoring.py
import requests
import json
import time
from datetime import datetime
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

class ELKMonitor:
    def __init__(self, config):
        self.config = config
        self.es_host = config['elasticsearch']['host']
        self.es_auth = (config['elasticsearch']['username'], 
                       config['elasticsearch']['password'])
        
    def check_cluster_health(self):
        """检查集群健康状态"""
        try:
            response = requests.get(
                f"{self.es_host}/_cluster/health",
                auth=self.es_auth,
                timeout=10
            )
            health = response.json()
            
            alerts = []
            
            if health['status'] == 'red':
                alerts.append({
                    'severity': 'critical',
                    'message': 'Cluster status is RED',
                    'details': health
                })
            elif health['status'] == 'yellow':
                alerts.append({
                    'severity': 'warning',
                    'message': 'Cluster status is YELLOW',
                    'details': health
                })
            
            if health['number_of_nodes'] < self.config['thresholds']['min_nodes']:
                alerts.append({
                    'severity': 'critical',
                    'message': f"Node count below threshold: {health['number_of_nodes']}",
                    'details': health
                })
            
            return alerts
            
        except Exception as e:
            return [{
                'severity': 'critical',
                'message': f'Failed to check cluster health: {str(e)}',
                'details': {}
            }]
    
    def check_node_stats(self):
        """检查节点统计信息"""
        try:
            response = requests.get(
                f"{self.es_host}/_nodes/stats",
                auth=self.es_auth,
                timeout=10
            )
            stats = response.json()
            
            alerts = []
            
            for node_id, node_stats in stats['nodes'].items():
                node_name = node_stats['name']
                
                # 检查JVM堆内存使用率
                heap_used_percent = node_stats['jvm']['mem']['heap_used_percent']
                if heap_used_percent > self.config['thresholds']['heap_usage']:
                    alerts.append({
                        'severity': 'warning',
                        'message': f'High heap usage on {node_name}: {heap_used_percent}%',
                        'details': node_stats['jvm']['mem']
                    })
                
                # 检查磁盘使用率
                for path, fs_stats in node_stats['fs']['data'].items():
                    disk_used_percent = (fs_stats['total_in_bytes'] - fs_stats['available_in_bytes']) / fs_stats['total_in_bytes'] * 100
                    if disk_used_percent > self.config['thresholds']['disk_usage']:
                        alerts.append({
                            'severity': 'warning',
                            'message': f'High disk usage on {node_name}: {disk_used_percent:.1f}%',
                            'details': fs_stats
                        })
            
            return alerts
            
        except Exception as e:
            return [{
                'severity': 'critical',
                'message': f'Failed to check node stats: {str(e)}',
                'details': {}
            }]
    
    def check_index_stats(self):
        """检查索引统计信息"""
        try:
            response = requests.get(
                f"{self.es_host}/_cat/indices?format=json&bytes=b",
                auth=self.es_auth,
                timeout=10
            )
            indices = response.json()
            
            alerts = []
            
            for index in indices:
                # 检查索引大小
                if int(index['store.size']) > self.config['thresholds']['index_size']:
                    alerts.append({
                        'severity': 'info',
                        'message': f'Large index detected: {index["index"]} ({index["store.size"]} bytes)',
                        'details': index
                    })
                
                # 检查分片状态
                if index['health'] == 'red':
                    alerts.append({
                        'severity': 'critical',
                        'message': f'Index {index["index"]} is in RED state',
                        'details': index
                    })
            
            return alerts
            
        except Exception as e:
            return [{
                'severity': 'critical',
                'message': f'Failed to check index stats: {str(e)}',
                'details': {}
            }]
    
    def send_alert(self, alerts):
        """发送告警"""
        if not alerts:
            return
        
        # 按严重程度分组
        critical_alerts = [a for a in alerts if a['severity'] == 'critical']
        warning_alerts = [a for a in alerts if a['severity'] == 'warning']
        info_alerts = [a for a in alerts if a['severity'] == 'info']
        
        # 构建告警消息
        message = f"ELK Cluster Alert - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
        
        if critical_alerts:
            message += "🚨 CRITICAL ALERTS:\n"
            for alert in critical_alerts:
                message += f"- {alert['message']}\n"
            message += "\n"
        
        if warning_alerts:
            message += "⚠️ WARNING ALERTS:\n"
            for alert in warning_alerts:
                message += f"- {alert['message']}\n"
            message += "\n"
        
        if info_alerts:
            message += "ℹ️ INFO ALERTS:\n"
            for alert in info_alerts:
                message += f"- {alert['message']}\n"
        
        # 发送邮件
        if self.config.get('email', {}).get('enabled', False):
            self.send_email_alert(message, critical_alerts)
        
        # 发送Slack通知
        if self.config.get('slack', {}).get('enabled', False):
            self.send_slack_alert(message, critical_alerts)
    
    def send_email_alert(self, message, critical_alerts):
        """发送邮件告警"""
        try:
            email_config = self.config['email']
            
            msg = MIMEMultipart()
            msg['From'] = email_config['from']
            msg['To'] = ', '.join(email_config['to'])
            msg['Subject'] = f"ELK Cluster Alert - {len(critical_alerts)} Critical Issues"
            
            msg.attach(MIMEText(message, 'plain'))
            
            server = smtplib.SMTP(email_config['smtp_host'], email_config['smtp_port'])
            server.starttls()
            server.login(email_config['username'], email_config['password'])
            server.send_message(msg)
            server.quit()
            
        except Exception as e:
            print(f"Failed to send email alert: {e}")
    
    def send_slack_alert(self, message, critical_alerts):
        """发送Slack告警"""
        try:
            slack_config = self.config['slack']
            
            color = "danger" if critical_alerts else "warning"
            
            payload = {
                "text": "ELK Cluster Alert",
                "attachments": [{
                    "color": color,
                    "text": message,
                    "footer": "ELK Monitor",
                    "ts": int(time.time())
                }]
            }
            
            requests.post(slack_config['webhook_url'], json=payload)
            
        except Exception as e:
            print(f"Failed to send Slack alert: {e}")
    
    def run_monitoring(self):
        """运行监控检查"""
        print(f"Starting ELK monitoring at {datetime.now()}")
        
        all_alerts = []
        
        # 检查集群健康状态
        all_alerts.extend(self.check_cluster_health())
        
        # 检查节点统计
        all_alerts.extend(self.check_node_stats())
        
        # 检查索引统计
        all_alerts.extend(self.check_index_stats())
        
        # 发送告警
        if all_alerts:
            self.send_alert(all_alerts)
            print(f"Found {len(all_alerts)} alerts")
        else:
            print("No alerts found")
        
        return all_alerts

# 配置文件示例
config = {
    "elasticsearch": {
        "host": "https://elasticsearch:9200",
        "username": "elastic",
        "password": "your_password"
    },
    "thresholds": {
        "min_nodes": 3,
        "heap_usage": 85,
        "disk_usage": 85,
        "index_size": 10737418240  # 10GB
    },
    "email": {
        "enabled": True,
        "smtp_host": "smtp.gmail.com",
        "smtp_port": 587,
        "username": "your_email@gmail.com",
        "password": "your_password",
        "from": "elk-monitor@company.com",
        "to": ["admin@company.com"]
    },
    "slack": {
        "enabled": True,
        "webhook_url": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
    }
}

if __name__ == "__main__":
    monitor = ELKMonitor(config)
    
    # 运行一次性检查
    monitor.run_monitoring()
    
    # 或者持续监控
    # while True:
    #     monitor.run_monitoring()
    #     time.sleep(300)  # 每5分钟检查一次

总结

本章详细介绍了ELK Stack的高级应用与集成,包括:

核心内容

  1. 微服务架构集成 - 分布式追踪、服务网格集成
  2. 云平台部署 - AWS、Kubernetes部署方案
  3. 第三方工具集成 - Prometheus、Grafana、Slack集成
  4. 机器学习应用 - 异常检测、日志分类
  5. 安全分析 - SIEM集成、威胁情报
  6. APM性能监控 - 应用性能监控集成
  7. 数据管道优化 - 缓存、压缩、归档策略
  8. 企业级解决方案 - 高可用、灾难恢复、监控告警

技术亮点

  • 完整的微服务日志聚合架构
  • 云原生部署最佳实践
  • 智能化异常检测和告警
  • 企业级安全分析能力
  • 全方位性能监控体系

实践价值

  • 提供生产级部署指导
  • 实现智能化运维管理
  • 构建完整的可观测性体系
  • 满足企业级安全合规要求

下一章将学习ELK Stack的故障排除与性能调优,帮助您解决生产环境中的常见问题。