目录

数据收集概述

数据收集架构

graph TB
    subgraph "数据源层"
        A1[Web服务器日志]
        A2[应用程序日志]
        A3[系统指标]
        A4[数据库日志]
        A5[网络数据包]
        A6[安全事件]
    end
    
    subgraph "收集层"
        B1[Filebeat]
        B2[Metricbeat]
        B3[Packetbeat]
        B4[Winlogbeat]
        B5[Auditbeat]
        B6[Heartbeat]
    end
    
    subgraph "处理层"
        C1[Logstash Pipeline 1]
        C2[Logstash Pipeline 2]
        C3[Logstash Pipeline 3]
    end
    
    subgraph "存储层"
        D[Elasticsearch]
    end
    
    A1 --> B1
    A2 --> B1
    A3 --> B2
    A4 --> B1
    A5 --> B3
    A6 --> B5
    
    B1 --> C1
    B2 --> C2
    B3 --> C3
    B4 --> C1
    B5 --> C3
    B6 --> C2
    
    C1 --> D
    C2 --> D
    C3 --> D

数据流处理模式

1. 直接模式 (Direct):

Beats → Elasticsearch

2. 缓冲模式 (Buffered):

Beats → Logstash → Elasticsearch

3. 队列模式 (Queued):

Beats → Message Queue → Logstash → Elasticsearch

4. 混合模式 (Hybrid):

Beats → Logstash → Message Queue → Logstash → Elasticsearch

Beats数据收集

1. Filebeat详细配置

基础配置示例:

# filebeat.yml
filebeat.inputs:
# Web服务器日志
- type: log
  enabled: true
  paths:
    - /var/log/nginx/access.log
    - /var/log/nginx/error.log
  fields:
    log_type: nginx
    environment: production
  fields_under_root: true
  multiline.pattern: '^\d{4}/\d{2}/\d{2}'
  multiline.negate: true
  multiline.match: after
  exclude_lines: ['^DEBUG']
  include_lines: ['^ERR', '^WARN', '^INFO']
  
# 应用程序日志
- type: log
  enabled: true
  paths:
    - /var/log/app/*.log
  fields:
    log_type: application
    service: web-api
  fields_under_root: true
  json.keys_under_root: true
  json.add_error_key: true
  json.message_key: message
  
# 容器日志
- type: container
  enabled: true
  paths:
    - '/var/lib/docker/containers/*/*.log'
  processors:
    - add_docker_metadata:
        host: "unix:///var/run/docker.sock"
        
# 模块配置
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: true
  reload.period: 10s

# 处理器配置
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata:
      host: ${NODE_NAME}
      matchers:
      - logs_path:
          logs_path: "/var/log/containers/"
          
# 输出配置
output.logstash:
  hosts: ["logstash1:5044", "logstash2:5044"]
  loadbalance: true
  compression_level: 3
  
# 日志配置
logging.level: info
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat
  keepfiles: 7
  permissions: 0644

模块配置示例:

# modules.d/nginx.yml
- module: nginx
  access:
    enabled: true
    var.paths: ["/var/log/nginx/access.log*"]
  error:
    enabled: true
    var.paths: ["/var/log/nginx/error.log*"]
    
# modules.d/apache.yml
- module: apache
  access:
    enabled: true
    var.paths: ["/var/log/apache2/access.log*"]
  error:
    enabled: true
    var.paths: ["/var/log/apache2/error.log*"]
    
# modules.d/mysql.yml
- module: mysql
  error:
    enabled: true
    var.paths: ["/var/log/mysql/error.log*"]
  slowlog:
    enabled: true
    var.paths: ["/var/log/mysql/mysql-slow.log*"]

2. Metricbeat系统监控

# metricbeat.yml
metricbeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: true
  reload.period: 10s

# 系统模块
metricbeat.modules:
# 系统指标
- module: system
  metricsets:
    - cpu
    - load
    - memory
    - network
    - process
    - process_summary
    - socket_summary
    - filesystem
    - fsstat
    - diskio
  enabled: true
  period: 10s
  processes: ['.*']
  cpu.metrics: ["percentages", "normalized_percentages"]
  core.metrics: ["percentages"]
  
# Docker监控
- module: docker
  metricsets:
    - container
    - cpu
    - diskio
    - healthcheck
    - info
    - memory
    - network
  hosts: ["unix:///var/run/docker.sock"]
  period: 10s
  enabled: true
  
# Kubernetes监控
- module: kubernetes
  metricsets:
    - node
    - system
    - pod
    - container
    - volume
  period: 10s
  host: ${NODE_NAME}
  hosts: ["https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT}"]
  
# 处理器
processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~
  
# 输出
output.elasticsearch:
  hosts: ["elasticsearch1:9200", "elasticsearch2:9200"]
  index: "metricbeat-%{+yyyy.MM.dd}"
  template.settings:
    index.number_of_shards: 1
    index.codec: best_compression

3. Packetbeat网络监控

# packetbeat.yml
packetbeat.interfaces.device: any
packetbeat.interfaces.snaplen: 1514
packetbeat.interfaces.type: af_packet
packetbeat.interfaces.buffer_size_mb: 100

# 协议配置
packetbeat.protocols:
# HTTP监控
- type: http
  ports: [80, 8080, 8000, 5000, 8002]
  hide_keywords: ["pass", "password", "passwd"]
  send_headers: true
  send_all_headers: true
  split_cookie: true
  real_ip_header: "X-Forwarded-For"
  
# MySQL监控
- type: mysql
  ports: [3306]
  max_rows: 10
  max_row_length: 1024
  
# Redis监控
- type: redis
  ports: [6379]
  
# DNS监控
- type: dns
  ports: [53]
  include_authorities: true
  include_additionals: true
  
# TLS监控
- type: tls
  ports: [443, 993, 995, 5223, 8443, 8883, 9243]
  
# 流配置
packetbeat.flows:
  timeout: 30s
  period: 10s
  
# 处理器
processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~
  - detect_mime_type:
      field: http.request.body.content
      target: http.request.mime_type
      
# 输出
output.elasticsearch:
  hosts: ["elasticsearch:9200"]
  index: "packetbeat-%{+yyyy.MM.dd}"

Logstash数据处理

1. 输入插件配置

Beats输入:

input {
  beats {
    port => 5044
    host => "0.0.0.0"
    client_inactivity_timeout => 300
    include_codec_tag => false
  }
}

文件输入:

input {
  file {
    path => "/var/log/app/*.log"
    start_position => "beginning"
    sincedb_path => "/var/lib/logstash/sincedb"
    codec => multiline {
      pattern => "^%{TIMESTAMP_ISO8601}"
      negate => true
      what => "previous"
    }
    tags => ["application", "multiline"]
  }
}

TCP/UDP输入:

input {
  tcp {
    port => 5000
    codec => json_lines
    tags => ["tcp", "json"]
  }
  
  udp {
    port => 5001
    codec => plain
    tags => ["udp", "syslog"]
  }
}

HTTP输入:

input {
  http {
    host => "0.0.0.0"
    port => 8080
    codec => json
    additional_codecs => {
      "application/json" => "json"
      "text/plain" => "plain"
    }
    tags => ["http", "webhook"]
  }
}

2. 过滤器插件详解

Grok解析:

filter {
  if [log_type] == "nginx" {
    grok {
      match => { 
        "message" => "%{NGINXACCESS}" 
      }
      patterns_dir => ["/etc/logstash/patterns"]
      tag_on_failure => ["_grokparsefailure"]
    }
  }
  
  if [log_type] == "apache" {
    grok {
      match => { 
        "message" => "%{COMBINEDAPACHELOG}" 
      }
    }
  }
  
  # 自定义Grok模式
  if [log_type] == "application" {
    grok {
      match => { 
        "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] %{GREEDYDATA:log_message}" 
      }
    }
  }
}

日期解析:

filter {
  date {
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    target => "@timestamp"
    timezone => "UTC"
  }
  
  # 多种日期格式
  date {
    match => [ 
      "log_timestamp", 
      "yyyy-MM-dd HH:mm:ss",
      "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
      "MMM dd HH:mm:ss"
    ]
  }
}

字段操作:

filter {
  mutate {
    # 添加字段
    add_field => { 
      "environment" => "production"
      "processed_by" => "logstash"
    }
    
    # 重命名字段
    rename => { 
      "host" => "hostname"
      "message" => "original_message"
    }
    
    # 删除字段
    remove_field => [ "beat", "input", "prospector" ]
    
    # 类型转换
    convert => { 
      "response_time" => "float"
      "status_code" => "integer"
      "bytes_sent" => "integer"
    }
    
    # 字符串操作
    lowercase => [ "method", "protocol" ]
    uppercase => [ "log_level" ]
    strip => [ "user_agent" ]
    
    # 分割字段
    split => { "tags" => "," }
    
    # 合并字段
    merge => { "all_errors" => "error_details" }
  }
}

条件处理:

filter {
  if [log_type] == "nginx" {
    if [status] >= 400 {
      mutate {
        add_tag => [ "error" ]
      }
    }
    
    if [status] >= 500 {
      mutate {
        add_tag => [ "server_error" ]
      }
    }
  }
  
  # 复杂条件
  if [log_type] == "application" and [level] == "ERROR" {
    mutate {
      add_field => { "alert_required" => "true" }
    }
  }
  
  # 正则表达式匹配
  if [user_agent] =~ /bot|crawler|spider/i {
    mutate {
      add_tag => [ "bot_traffic" ]
    }
  }
}

数据解析与转换

1. JSON数据处理

filter {
  # JSON解析
  if [log_type] == "application" {
    json {
      source => "message"
      target => "parsed"
      skip_on_invalid_json => true
    }
    
    # 提取JSON字段
    if [parsed] {
      mutate {
        add_field => {
          "app_name" => "%{[parsed][application]}"
          "log_level" => "%{[parsed][level]}"
          "thread_name" => "%{[parsed][thread]}"
        }
      }
    }
  }
}

2. XML数据处理

filter {
  xml {
    source => "message"
    target => "parsed_xml"
    xpath => [
      "/log/level/text()", "log_level",
      "/log/message/text()", "log_message",
      "/log/@timestamp", "log_timestamp"
    ]
  }
}

3. CSV数据处理

filter {
  csv {
    source => "message"
    separator => ","
    columns => [ "timestamp", "level", "component", "message", "thread" ]
    skip_empty_columns => true
  }
}

4. 地理位置解析

filter {
  # GeoIP解析
  geoip {
    source => "client_ip"
    target => "geoip"
    database => "/etc/logstash/GeoLite2-City.mmdb"
    add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
    add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
  }
  
  # 坐标转换
  mutate {
    convert => [ "[geoip][coordinates]", "float"]
  }
}

5. 用户代理解析

filter {
  useragent {
    source => "user_agent"
    target => "ua"
    prefix => "ua_"
  }
}

数据路由与过滤

1. 多管道配置

pipelines.yml配置:

# pipelines.yml
- pipeline.id: web-logs
  path.config: "/etc/logstash/conf.d/web-*.conf"
  pipeline.workers: 4
  pipeline.batch.size: 1000
  
- pipeline.id: app-logs
  path.config: "/etc/logstash/conf.d/app-*.conf"
  pipeline.workers: 2
  pipeline.batch.size: 500
  
- pipeline.id: security-logs
  path.config: "/etc/logstash/conf.d/security-*.conf"
  pipeline.workers: 1
  pipeline.batch.size: 100
  queue.type: persisted
  queue.max_bytes: 1gb

2. 条件路由

# 基于日志类型路由
output {
  if [log_type] == "nginx" {
    elasticsearch {
      hosts => ["es-web:9200"]
      index => "nginx-logs-%{+YYYY.MM.dd}"
    }
  }
  
  if [log_type] == "application" {
    elasticsearch {
      hosts => ["es-app:9200"]
      index => "app-logs-%{+YYYY.MM.dd}"
    }
  }
  
  if [log_type] == "security" {
    elasticsearch {
      hosts => ["es-security:9200"]
      index => "security-logs-%{+YYYY.MM.dd}"
    }
  }
  
  # 错误日志额外处理
  if [level] == "ERROR" or [status] >= 500 {
    email {
      to => "admin@example.com"
      subject => "Critical Error Detected"
      body => "Error: %{message}"
    }
  }
}

3. 数据过滤

filter {
  # 丢弃调试日志
  if [level] == "DEBUG" {
    drop { }
  }
  
  # 丢弃健康检查请求
  if [request] =~ /\/health|\/_status/ {
    drop { }
  }
  
  # 丢弃静态资源请求
  if [request] =~ /\.(css|js|png|jpg|gif|ico)$/ {
    drop { }
  }
  
  # 采样处理
  if [log_type] == "debug" {
    if [sample_rate] {
      ruby {
        code => "
          if rand() > event.get('sample_rate').to_f
            event.cancel
          end
        "
      }
    }
  }
}

性能优化

1. 管道优化

# logstash.yml性能配置
pipeline.workers: 8
pipeline.batch.size: 2000
pipeline.batch.delay: 50

# 队列配置
queue.type: persisted
queue.max_bytes: 2gb
queue.checkpoint.writes: 1024

# JVM配置
config.reload.automatic: true
config.reload.interval: 3s

# 监控配置
monitoring.enabled: true
monitoring.elasticsearch.hosts: ["http://localhost:9200"]

2. 内存优化

# jvm.options优化
-Xms4g
-Xmx4g
-XX:+UseG1GC
-XX:+UseStringDeduplication
-XX:+AlwaysPreTouch

3. 输出优化

output {
  elasticsearch {
    hosts => ["es1:9200", "es2:9200", "es3:9200"]
    index => "logs-%{+YYYY.MM.dd}"
    
    # 批量配置
    flush_size => 1000
    idle_flush_time => 1
    
    # 连接池配置
    pool_max => 1000
    pool_max_per_route => 100
    
    # 重试配置
    retry_max_interval => 5
    retry_max_items => 5000
    
    # 模板配置
    template_overwrite => true
    template => "/etc/logstash/templates/logs-template.json"
    template_name => "logs"
  }
}

监控与调试

1. 管道监控

# 查看管道状态
curl -X GET "localhost:9600/_node/stats/pipelines?pretty"

# 查看热线程
curl -X GET "localhost:9600/_node/hot_threads?pretty"

# 查看插件信息
curl -X GET "localhost:9600/_node/plugins?pretty"

2. 性能监控脚本

#!/bin/bash
# logstash-monitor.sh

LOGSTASH_API="http://localhost:9600"

# 获取管道统计
get_pipeline_stats() {
    echo "Pipeline Statistics:"
    curl -s "$LOGSTASH_API/_node/stats/pipelines" | jq -r '
        .pipelines | to_entries[] | 
        "Pipeline: \(.key)" +
        "\n  Events In: \(.value.events.in)" +
        "\n  Events Out: \(.value.events.out)" +
        "\n  Events Filtered: \(.value.events.filtered)" +
        "\n  Duration (ms): \(.value.events.duration_in_millis)" +
        "\n  Queue Push Duration (ms): \(.value.events.queue_push_duration_in_millis)\n"
    '
}

# 获取JVM统计
get_jvm_stats() {
    echo "JVM Statistics:"
    curl -s "$LOGSTASH_API/_node/stats/jvm" | jq -r '
        "Heap Used: \(.jvm.mem.heap_used_percent)%" +
        "\nHeap Max: \(.jvm.mem.heap_max_in_bytes / 1024 / 1024 | floor)MB" +
        "\nGC Count: \(.jvm.gc.collectors.old.collection_count)" +
        "\nGC Time: \(.jvm.gc.collectors.old.collection_time_in_millis)ms"
    '
}

# 主函数
main() {
    echo "Logstash Monitoring - $(date)"
    echo "================================"
    get_pipeline_stats
    echo
    get_jvm_stats
    echo "================================"
}

main

3. 调试配置

# 调试输出
output {
  stdout {
    codec => rubydebug {
      metadata => true
    }
  }
  
  # 文件输出用于调试
  file {
    path => "/var/log/logstash/debug-%{+YYYY.MM.dd}.log"
    codec => json_lines
  }
}

实战案例

1. Web服务器日志分析

完整配置示例:

# web-logs.conf
input {
  beats {
    port => 5044
    type => "web"
  }
}

filter {
  if [fields][log_type] == "nginx" {
    grok {
      match => { 
        "message" => "%{NGINXACCESS}" 
      }
    }
    
    date {
      match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    }
    
    geoip {
      source => "clientip"
      target => "geoip"
    }
    
    useragent {
      source => "agent"
      target => "useragent"
    }
    
    mutate {
      convert => { 
        "response" => "integer"
        "bytes" => "integer"
        "responsetime" => "float"
      }
    }
    
    # 添加响应时间分类
    if [responsetime] {
      if [responsetime] < 0.1 {
        mutate { add_field => { "response_category" => "fast" } }
      } else if [responsetime] < 1.0 {
        mutate { add_field => { "response_category" => "normal" } }
      } else {
        mutate { add_field => { "response_category" => "slow" } }
      }
    }
    
    # 添加状态码分类
    if [response] {
      if [response] >= 200 and [response] < 300 {
        mutate { add_field => { "status_category" => "success" } }
      } else if [response] >= 300 and [response] < 400 {
        mutate { add_field => { "status_category" => "redirect" } }
      } else if [response] >= 400 and [response] < 500 {
        mutate { add_field => { "status_category" => "client_error" } }
      } else if [response] >= 500 {
        mutate { add_field => { "status_category" => "server_error" } }
      }
    }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "nginx-access-%{+YYYY.MM.dd}"
    template_overwrite => true
    template => "/etc/logstash/templates/nginx-template.json"
    template_name => "nginx"
  }
  
  # 错误日志告警
  if [response] >= 500 {
    email {
      to => "ops@example.com"
      subject => "Server Error Alert"
      body => "Server error detected: %{message}"
    }
  }
}

2. 应用程序日志处理

# app-logs.conf
input {
  beats {
    port => 5045
    type => "application"
  }
}

filter {
  if [fields][log_type] == "application" {
    # 多行日志处理
    multiline {
      pattern => "^\d{4}-\d{2}-\d{2}"
      negate => true
      what => "previous"
    }
    
    # JSON日志解析
    if [message] =~ /^\{/ {
      json {
        source => "message"
        skip_on_invalid_json => true
      }
    } else {
      # 结构化日志解析
      grok {
        match => { 
          "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{LOGLEVEL:level}\] \[%{DATA:thread}\] %{DATA:logger} - %{GREEDYDATA:log_message}" 
        }
      }
    }
    
    # 异常堆栈处理
    if [log_message] =~ /Exception|Error/ {
      mutate {
        add_tag => [ "exception" ]
      }
      
      # 提取异常类型
      grok {
        match => { 
          "log_message" => "%{JAVACLASS:exception_class}" 
        }
        tag_on_failure => []
      }
    }
    
    # 性能日志处理
    if [logger] =~ /performance/ {
      grok {
        match => { 
          "log_message" => "Method: %{DATA:method_name}, Duration: %{NUMBER:duration:float}ms" 
        }
      }
      
      if [duration] {
        if [duration] > 1000 {
          mutate { add_tag => [ "slow_method" ] }
        }
      }
    }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "application-logs-%{+YYYY.MM.dd}"
  }
  
  # 异常告警
  if "exception" in [tags] {
    slack {
      url => "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
      channel => "#alerts"
      username => "logstash"
      text => "Exception detected: %{exception_class} - %{log_message}"
    }
  }
}

3. 安全日志分析

# security-logs.conf
input {
  beats {
    port => 5046
    type => "security"
  }
}

filter {
  if [fields][log_type] == "auth" {
    grok {
      match => { 
        "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:hostname} %{WORD:program}(?:\[%{POSINT:pid}\])?: %{GREEDYDATA:log_message}" 
      }
    }
    
    # SSH登录分析
    if [program] == "sshd" {
      if [log_message] =~ /Failed password/ {
        grok {
          match => { 
            "log_message" => "Failed password for %{DATA:username} from %{IP:source_ip} port %{INT:source_port}" 
          }
        }
        mutate { add_tag => [ "ssh_failed_login" ] }
      }
      
      if [log_message] =~ /Accepted password/ {
        grok {
          match => { 
            "log_message" => "Accepted password for %{DATA:username} from %{IP:source_ip} port %{INT:source_port}" 
          }
        }
        mutate { add_tag => [ "ssh_successful_login" ] }
      }
    }
    
    # 地理位置分析
    if [source_ip] {
      geoip {
        source => "source_ip"
        target => "geoip"
      }
    }
    
    # 威胁检测
    if "ssh_failed_login" in [tags] {
      # 检查是否为已知恶意IP
      translate {
        source => "source_ip"
        target => "threat_level"
        dictionary_path => "/etc/logstash/threat_ips.yml"
        fallback => "unknown"
      }
    }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "security-logs-%{+YYYY.MM.dd}"
  }
  
  # 安全告警
  if "ssh_failed_login" in [tags] {
    # 发送到SIEM系统
    http {
      url => "https://siem.example.com/api/alerts"
      http_method => "post"
      format => "json"
      headers => {
        "Authorization" => "Bearer YOUR_TOKEN"
      }
    }
  }
}

总结

本章详细介绍了ELK Stack的数据收集与处理,包括:

核心要点

  1. 数据收集: Beats系列工具的配置和使用
  2. 数据处理: Logstash的输入、过滤、输出配置
  3. 数据解析: 各种数据格式的解析和转换
  4. 数据路由: 基于条件的数据路由和过滤
  5. 性能优化: 管道优化、内存调优、批量处理
  6. 监控调试: 性能监控、问题诊断、调试技巧

最佳实践

  • 合理设计数据收集架构
  • 优化Grok模式以提高解析性能
  • 使用条件语句减少不必要的处理
  • 配置适当的批量大小和工作线程
  • 建立完善的监控和告警机制
  • 定期清理和优化配置文件

下一章我们将学习Elasticsearch的索引管理与查询优化。