目录

Logstash架构深入

1. 核心架构组件

graph TB
    subgraph "Logstash进程"
        A[Input Plugins] --> B[Input Queue]
        B --> C[Filter Workers]
        C --> D[Output Queue]
        D --> E[Output Plugins]
        
        subgraph "Filter Workers"
            C1[Worker 1]
            C2[Worker 2]
            C3[Worker 3]
            C4[Worker N]
        end
        
        C --> C1
        C --> C2
        C --> C3
        C --> C4
    end
    
    subgraph "持久化队列"
        F[Page Files]
        G[Checkpoint Files]
        H[Dead Letter Queue]
    end
    
    B -.-> F
    D -.-> G
    C -.-> H

2. 事件生命周期

# 事件对象结构
{
  "@timestamp" => 2024-01-15T10:30:00.000Z,
  "@version" => "1",
  "@metadata" => {
    "beat" => "filebeat",
    "type" => "_doc",
    "version" => "8.11.0"
  },
  "message" => "原始日志消息",
  "host" => {
    "name" => "web-server-01"
  },
  "fields" => {
    "environment" => "production",
    "service" => "web-api"
  }
}

3. 内存管理机制

# jvm.options - JVM内存配置
-Xms2g
-Xmx2g
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:+UseStringDeduplication
-XX:+AlwaysPreTouch
-XX:+ExitOnOutOfMemoryError

# 堆内存分配建议
# - 输入缓冲区: 10-20%
# - 过滤器处理: 60-70%
# - 输出缓冲区: 10-20%
# - 系统开销: 10%

高级配置管理

1. 多管道架构设计

pipelines.yml高级配置:

# pipelines.yml
# 高吞吐量Web日志管道
- pipeline.id: web-logs-high-volume
  path.config: "/etc/logstash/conf.d/web-*.conf"
  pipeline.workers: 8
  pipeline.batch.size: 2000
  pipeline.batch.delay: 50
  queue.type: persisted
  queue.max_bytes: 2gb
  queue.checkpoint.writes: 1024
  queue.checkpoint.interval: 1000
  
# 低延迟安全日志管道
- pipeline.id: security-logs-low-latency
  path.config: "/etc/logstash/conf.d/security-*.conf"
  pipeline.workers: 2
  pipeline.batch.size: 100
  pipeline.batch.delay: 5
  queue.type: memory
  
# 重要业务日志管道(高可靠性)
- pipeline.id: business-critical
  path.config: "/etc/logstash/conf.d/business-*.conf"
  pipeline.workers: 4
  pipeline.batch.size: 500
  pipeline.batch.delay: 10
  queue.type: persisted
  queue.max_bytes: 5gb
  queue.checkpoint.writes: 512
  dead_letter_queue.enable: true
  dead_letter_queue.max_bytes: 1gb

2. 配置模板化

基础模板配置:

# templates/base-input.conf
input {
  beats {
    port => "${BEATS_PORT:5044}"
    host => "${BEATS_HOST:0.0.0.0}"
    client_inactivity_timeout => "${CLIENT_TIMEOUT:300}"
    include_codec_tag => false
  }
}

# templates/base-filter.conf
filter {
  # 添加通用字段
  mutate {
    add_field => {
      "[@metadata][pipeline]" => "${PIPELINE_ID}"
      "[@metadata][environment]" => "${ENVIRONMENT:production}"
      "[@metadata][datacenter]" => "${DATACENTER:dc1}"
    }
  }
  
  # 通用时间戳处理
  if [@timestamp] {
    date {
      match => [ "@timestamp", "ISO8601" ]
      target => "@timestamp"
    }
  }
  
  # 通用主机信息处理
  if [host] {
    mutate {
      rename => { "[host][name]" => "hostname" }
    }
  }
}

# templates/base-output.conf
output {
  elasticsearch {
    hosts => ["${ES_HOSTS:localhost:9200}"]
    index => "${INDEX_PREFIX:logstash}-%{[@metadata][environment]}-%{+YYYY.MM.dd}"
    
    # 性能优化配置
    flush_size => "${FLUSH_SIZE:1000}"
    idle_flush_time => "${IDLE_FLUSH_TIME:1}"
    
    # 连接池配置
    pool_max => "${POOL_MAX:1000}"
    pool_max_per_route => "${POOL_MAX_PER_ROUTE:100}"
    
    # 重试配置
    retry_max_interval => "${RETRY_MAX_INTERVAL:5}"
    retry_max_items => "${RETRY_MAX_ITEMS:5000}"
    
    # 认证配置
    user => "${ES_USER}"
    password => "${ES_PASSWORD}"
    ssl => "${ES_SSL:false}"
    ssl_certificate_verification => "${ES_SSL_VERIFY:true}"
  }
  
  # 调试输出(可选)
  if "${DEBUG_OUTPUT:false}" == "true" {
    stdout {
      codec => rubydebug {
        metadata => true
      }
    }
  }
}

3. 环境变量管理

环境配置文件:

# environments/production.env
ENVIRONMENT=production
DATACENTER=dc1
BEATS_PORT=5044
BEATS_HOST=0.0.0.0
ES_HOSTS=es-prod-01:9200,es-prod-02:9200,es-prod-03:9200
ES_USER=logstash_writer
ES_PASSWORD=secure_password
ES_SSL=true
FLUSH_SIZE=2000
IDLE_FLUSH_TIME=1
DEBUG_OUTPUT=false

# environments/staging.env
ENVIRONMENT=staging
DATACENTER=dc1
BEATS_PORT=5044
BEATS_HOST=0.0.0.0
ES_HOSTS=es-staging:9200
ES_USER=logstash_writer
ES_PASSWORD=staging_password
ES_SSL=false
FLUSH_SIZE=500
IDLE_FLUSH_TIME=5
DEBUG_OUTPUT=true

# environments/development.env
ENVIRONMENT=development
DATACENTER=local
BEATS_PORT=5044
BEATS_HOST=localhost
ES_HOSTS=localhost:9200
ES_USER=elastic
ES_PASSWORD=changeme
ES_SSL=false
FLUSH_SIZE=100
IDLE_FLUSH_TIME=10
DEBUG_OUTPUT=true

4. 动态配置重载

# logstash.yml - 动态配置
config.reload.automatic: true
config.reload.interval: 3s
config.support_escapes: true

# 配置验证
config.test_and_exit: false
config.reload.automatic: true

# 监控配置变化的脚本
#!/bin/bash
# config-watcher.sh

CONFIG_DIR="/etc/logstash/conf.d"
LOGSTASH_API="http://localhost:9600"

# 监控配置文件变化
inotifywait -m -r -e modify,create,delete "$CONFIG_DIR" |
while read path action file; do
    echo "$(date): Configuration change detected: $action $file in $path"
    
    # 验证配置
    if /usr/share/logstash/bin/logstash --config.test_and_exit --path.config="$CONFIG_DIR"; then
        echo "Configuration validation passed"
        
        # 触发重载
        curl -X POST "$LOGSTASH_API/_node/reload"
        echo "Configuration reload triggered"
    else
        echo "Configuration validation failed - reload skipped"
        # 发送告警
        echo "Invalid Logstash configuration detected" | mail -s "Logstash Config Error" admin@example.com
    fi
done

插件开发与定制

1. 自定义过滤器插件

Ruby插件开发示例:

# lib/logstash/filters/custom_parser.rb
require "logstash/filters/base"
require "logstash/namespace"

class LogStash::Filters::CustomParser < LogStash::Filters::Base
  config_name "custom_parser"
  
  # 配置参数
  config :source, :validate => :string, :required => true
  config :target, :validate => :string, :default => "parsed"
  config :pattern, :validate => :string, :required => true
  config :on_error, :validate => :string, :default => "tag"
  
  def register
    # 编译正则表达式
    @regex = Regexp.new(@pattern)
    @logger.info("Custom parser initialized with pattern: #{@pattern}")
  end
  
  def filter(event)
    source_value = event.get(@source)
    return unless source_value
    
    begin
      match = @regex.match(source_value.to_s)
      
      if match
        # 提取命名捕获组
        parsed_data = {}
        match.names.each do |name|
          parsed_data[name] = match[name] if match[name]
        end
        
        # 设置解析结果
        event.set(@target, parsed_data) unless parsed_data.empty?
        
        # 添加成功标记
        event.tag("_custom_parser_success")
        
        @logger.debug("Successfully parsed", :source => source_value, :result => parsed_data)
      else
        handle_error(event, "Pattern did not match")
      end
      
    rescue => e
      handle_error(event, "Parsing error: #{e.message}")
    end
    
    # 过滤器必须调用这个方法
    filter_matched(event)
  end
  
  private
  
  def handle_error(event, message)
    case @on_error
    when "tag"
      event.tag("_custom_parser_failure")
    when "field"
      event.set("[@metadata][custom_parser_error]", message)
    when "drop"
      event.cancel
    end
    
    @logger.warn("Custom parser error", :message => message, :event => event.to_hash)
  end
end

插件配置文件:

# logstash-filter-custom_parser.gemspec
Gem::Specification.new do |s|
  s.name          = 'logstash-filter-custom_parser'
  s.version       = '1.0.0'
  s.licenses      = ['Apache-2.0']
  s.summary       = 'Custom parser filter for Logstash'
  s.description   = 'A custom filter plugin for parsing specific log formats'
  s.authors       = ['Your Name']
  s.email         = 'your.email@example.com'
  s.homepage      = 'https://github.com/yourusername/logstash-filter-custom_parser'
  s.require_paths = ['lib']

  # Files
  s.files = Dir['lib/**/*','spec/**/*','*.gemspec','*.md','CONTRIBUTORS','Gemfile','LICENSE','NOTICE.TXT']

  # Tests
  s.test_files = s.files.grep(%r{^(test|spec|features)/})

  # Special flag to let us know this is actually a logstash plugin
  s.metadata = { "logstash_plugin" => "true", "logstash_group" => "filter" }

  # Gem dependencies
  s.add_runtime_dependency "logstash-core-plugin-api", "~> 2.0"
  s.add_development_dependency 'logstash-devutils'
end

2. 自定义输出插件

# lib/logstash/outputs/custom_webhook.rb
require "logstash/outputs/base"
require "logstash/namespace"
require "net/http"
require "json"

class LogStash::Outputs::CustomWebhook < LogStash::Outputs::Base
  config_name "custom_webhook"
  
  config :url, :validate => :string, :required => true
  config :method, :validate => ["post", "put"], :default => "post"
  config :headers, :validate => :hash, :default => {}
  config :format, :validate => ["json", "form"], :default => "json"
  config :timeout, :validate => :number, :default => 30
  config :retry_count, :validate => :number, :default => 3
  config :retry_delay, :validate => :number, :default => 1
  
  def register
    @uri = URI.parse(@url)
    @http = Net::HTTP.new(@uri.host, @uri.port)
    @http.use_ssl = (@uri.scheme == "https")
    @http.read_timeout = @timeout
    @http.open_timeout = @timeout
    
    @logger.info("Custom webhook output initialized", :url => @url)
  end
  
  def multi_receive(events)
    events.each do |event|
      send_event(event)
    end
  end
  
  private
  
  def send_event(event)
    retries = 0
    
    begin
      request = create_request(event)
      response = @http.request(request)
      
      if response.code.to_i >= 200 && response.code.to_i < 300
        @logger.debug("Event sent successfully", :response_code => response.code)
      else
        raise "HTTP error: #{response.code} #{response.message}"
      end
      
    rescue => e
      retries += 1
      
      if retries <= @retry_count
        @logger.warn("Retrying webhook request", :retry => retries, :error => e.message)
        sleep(@retry_delay * retries)
        retry
      else
        @logger.error("Failed to send webhook after #{@retry_count} retries", :error => e.message)
      end
    end
  end
  
  def create_request(event)
    case @method
    when "post"
      request = Net::HTTP::Post.new(@uri.path)
    when "put"
      request = Net::HTTP::Put.new(@uri.path)
    end
    
    # 设置头部
    @headers.each do |key, value|
      request[key] = event.sprintf(value)
    end
    
    # 设置内容
    case @format
    when "json"
      request['Content-Type'] = 'application/json'
      request.body = event.to_hash.to_json
    when "form"
      request['Content-Type'] = 'application/x-www-form-urlencoded'
      request.body = URI.encode_www_form(event.to_hash)
    end
    
    request
  end
end

3. 插件测试

# spec/filters/custom_parser_spec.rb
require "logstash/devutils/rspec/spec_helper"
require "logstash/filters/custom_parser"

describe LogStash::Filters::CustomParser do
  let(:config) do
    {
      "source" => "message",
      "target" => "parsed",
      "pattern" => "(?<timestamp>\\d{4}-\\d{2}-\\d{2}) (?<level>\\w+) (?<message>.*)"
    }
  end
  
  subject { described_class.new(config) }
  
  before do
    subject.register
  end
  
  describe "successful parsing" do
    let(:event) { LogStash::Event.new("message" => "2024-01-15 INFO This is a test message") }
    
    it "parses the message correctly" do
      subject.filter(event)
      
      expect(event.get("[parsed][timestamp]")).to eq("2024-01-15")
      expect(event.get("[parsed][level]")).to eq("INFO")
      expect(event.get("[parsed][message]")).to eq("This is a test message")
      expect(event.get("tags")).to include("_custom_parser_success")
    end
  end
  
  describe "parsing failure" do
    let(:event) { LogStash::Event.new("message" => "invalid log format") }
    
    it "handles parsing failure" do
      subject.filter(event)
      
      expect(event.get("parsed")).to be_nil
      expect(event.get("tags")).to include("_custom_parser_failure")
    end
  end
end

集群部署与管理

1. 集群架构设计

graph TB
    subgraph "负载均衡层"
        LB[HAProxy/Nginx]
    end
    
    subgraph "Logstash集群"
        LS1[Logstash Node 1]
        LS2[Logstash Node 2]
        LS3[Logstash Node 3]
        LS4[Logstash Node 4]
    end
    
    subgraph "消息队列"
        MQ1[Redis Cluster]
        MQ2[Kafka Cluster]
    end
    
    subgraph "存储层"
        ES[Elasticsearch Cluster]
    end
    
    subgraph "监控层"
        MON[Monitoring Stack]
    end
    
    LB --> LS1
    LB --> LS2
    LB --> LS3
    LB --> LS4
    
    LS1 --> MQ1
    LS2 --> MQ1
    LS3 --> MQ2
    LS4 --> MQ2
    
    LS1 --> ES
    LS2 --> ES
    LS3 --> ES
    LS4 --> ES
    
    LS1 --> MON
    LS2 --> MON
    LS3 --> MON
    LS4 --> MON

2. Docker集群部署

docker-compose.yml:

version: '3.8'

services:
  # Logstash节点1 - Web日志处理
  logstash-web:
    image: docker.elastic.co/logstash/logstash:8.11.0
    container_name: logstash-web
    environment:
      - "LS_JAVA_OPTS=-Xmx2g -Xms2g"
      - "PIPELINE_ID=web-logs"
      - "ENVIRONMENT=production"
    volumes:
      - ./config/logstash-web.yml:/usr/share/logstash/config/logstash.yml:ro
      - ./pipeline/web:/usr/share/logstash/pipeline:ro
      - ./patterns:/usr/share/logstash/patterns:ro
      - logstash-web-data:/usr/share/logstash/data
    ports:
      - "5044:5044"
    networks:
      - elk-network
    depends_on:
      - elasticsearch
    deploy:
      resources:
        limits:
          memory: 3g
        reservations:
          memory: 2g
    restart: unless-stopped
    
  # Logstash节点2 - 应用日志处理
  logstash-app:
    image: docker.elastic.co/logstash/logstash:8.11.0
    container_name: logstash-app
    environment:
      - "LS_JAVA_OPTS=-Xmx2g -Xms2g"
      - "PIPELINE_ID=app-logs"
      - "ENVIRONMENT=production"
    volumes:
      - ./config/logstash-app.yml:/usr/share/logstash/config/logstash.yml:ro
      - ./pipeline/app:/usr/share/logstash/pipeline:ro
      - ./patterns:/usr/share/logstash/patterns:ro
      - logstash-app-data:/usr/share/logstash/data
    ports:
      - "5045:5044"
    networks:
      - elk-network
    depends_on:
      - elasticsearch
    deploy:
      resources:
        limits:
          memory: 3g
        reservations:
          memory: 2g
    restart: unless-stopped
    
  # Logstash节点3 - 安全日志处理
  logstash-security:
    image: docker.elastic.co/logstash/logstash:8.11.0
    container_name: logstash-security
    environment:
      - "LS_JAVA_OPTS=-Xmx1g -Xms1g"
      - "PIPELINE_ID=security-logs"
      - "ENVIRONMENT=production"
    volumes:
      - ./config/logstash-security.yml:/usr/share/logstash/config/logstash.yml:ro
      - ./pipeline/security:/usr/share/logstash/pipeline:ro
      - ./patterns:/usr/share/logstash/patterns:ro
      - logstash-security-data:/usr/share/logstash/data
    ports:
      - "5046:5044"
    networks:
      - elk-network
    depends_on:
      - elasticsearch
    deploy:
      resources:
        limits:
          memory: 2g
        reservations:
          memory: 1g
    restart: unless-stopped

  # HAProxy负载均衡
  haproxy:
    image: haproxy:2.8
    container_name: logstash-lb
    volumes:
      - ./haproxy/haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro
    ports:
      - "5040:5040"  # 统一入口
      - "8404:8404"  # 统计页面
    networks:
      - elk-network
    depends_on:
      - logstash-web
      - logstash-app
      - logstash-security
    restart: unless-stopped

volumes:
  logstash-web-data:
  logstash-app-data:
  logstash-security-data:

networks:
  elk-network:
    external: true

HAProxy配置:

# haproxy/haproxy.cfg
global
    daemon
    log stdout local0
    chroot /var/lib/haproxy
    stats socket /run/haproxy/admin.sock mode 660 level admin
    stats timeout 30s
    user haproxy
    group haproxy

defaults
    mode tcp
    log global
    option tcplog
    option dontlognull
    timeout connect 5000ms
    timeout client 50000ms
    timeout server 50000ms
    errorfile 400 /etc/haproxy/errors/400.http
    errorfile 403 /etc/haproxy/errors/403.http
    errorfile 408 /etc/haproxy/errors/408.http
    errorfile 500 /etc/haproxy/errors/500.http
    errorfile 502 /etc/haproxy/errors/502.http
    errorfile 503 /etc/haproxy/errors/503.http
    errorfile 504 /etc/haproxy/errors/504.http

# 统计页面
frontend stats
    bind *:8404
    mode http
    stats enable
    stats uri /stats
    stats refresh 30s
    stats admin if TRUE

# Logstash负载均衡
frontend logstash_frontend
    bind *:5040
    mode tcp
    default_backend logstash_backend

backend logstash_backend
    mode tcp
    balance roundrobin
    option tcp-check
    
    # 健康检查
    tcp-check connect port 9600
    tcp-check send "GET /_node/stats HTTP/1.1\r\nHost: localhost\r\n\r\n"
    tcp-check expect string "200 OK"
    
    server logstash-web logstash-web:5044 check inter 5s rise 2 fall 3
    server logstash-app logstash-app:5044 check inter 5s rise 2 fall 3
    server logstash-security logstash-security:5044 check inter 5s rise 2 fall 3

3. Kubernetes部署

logstash-deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: logstash
  namespace: logging
  labels:
    app: logstash
spec:
  replicas: 3
  selector:
    matchLabels:
      app: logstash
  template:
    metadata:
      labels:
        app: logstash
    spec:
      containers:
      - name: logstash
        image: docker.elastic.co/logstash/logstash:8.11.0
        ports:
        - containerPort: 5044
          name: beats
        - containerPort: 9600
          name: http
        env:
        - name: LS_JAVA_OPTS
          value: "-Xmx2g -Xms2g"
        - name: ELASTICSEARCH_HOSTS
          value: "http://elasticsearch:9200"
        resources:
          requests:
            memory: "2Gi"
            cpu: "500m"
          limits:
            memory: "3Gi"
            cpu: "2000m"
        volumeMounts:
        - name: config
          mountPath: /usr/share/logstash/config
        - name: pipeline
          mountPath: /usr/share/logstash/pipeline
        - name: patterns
          mountPath: /usr/share/logstash/patterns
        - name: data
          mountPath: /usr/share/logstash/data
        livenessProbe:
          httpGet:
            path: /
            port: 9600
          initialDelaySeconds: 60
          periodSeconds: 30
        readinessProbe:
          httpGet:
            path: /_node/stats
            port: 9600
          initialDelaySeconds: 30
          periodSeconds: 10
      volumes:
      - name: config
        configMap:
          name: logstash-config
      - name: pipeline
        configMap:
          name: logstash-pipeline
      - name: patterns
        configMap:
          name: logstash-patterns
      - name: data
        emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
  name: logstash
  namespace: logging
spec:
  selector:
    app: logstash
  ports:
  - name: beats
    port: 5044
    targetPort: 5044
  - name: http
    port: 9600
    targetPort: 9600
  type: ClusterIP
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-config
  namespace: logging
data:
  logstash.yml: |
    http.host: "0.0.0.0"
    path.config: /usr/share/logstash/pipeline
    path.logs: /usr/share/logstash/logs
    pipeline.workers: 4
    pipeline.batch.size: 1000
    pipeline.batch.delay: 50
    queue.type: persisted
    queue.max_bytes: 1gb
    monitoring.enabled: true
    monitoring.elasticsearch.hosts: ["http://elasticsearch:9200"]
  jvm.options: |
    -Xms2g
    -Xmx2g
    -XX:+UseG1GC
    -XX:MaxGCPauseMillis=200
    -XX:+UseStringDeduplication

性能调优与监控

1. 性能基准测试

#!/bin/bash
# performance-test.sh

LOGSTASH_HOST="localhost"
LOGSTASH_PORT="5044"
TEST_DURATION="300"  # 5分钟
CONCURRENT_CONNECTIONS="10"
MESSAGES_PER_SECOND="1000"

echo "Starting Logstash performance test..."
echo "Target: $LOGSTASH_HOST:$LOGSTASH_PORT"
echo "Duration: $TEST_DURATION seconds"
echo "Concurrent connections: $CONCURRENT_CONNECTIONS"
echo "Messages per second: $MESSAGES_PER_SECOND"

# 生成测试数据
generate_test_data() {
    local count=$1
    for ((i=1; i<=count; i++)); do
        echo "{
            \"@timestamp\": \"$(date -u +%Y-%m-%dT%H:%M:%S.%3NZ)\",
            \"level\": \"INFO\",
            \"message\": \"Test message $i from performance test\",
            \"host\": \"test-host-$(($i % 10))\",
            \"service\": \"performance-test\",
            \"thread\": \"thread-$(($i % 4))\",
            \"response_time\": $((RANDOM % 1000)),
            \"status_code\": $((200 + RANDOM % 300))
        }"
    done
}

# 发送测试数据
send_test_data() {
    local connection_id=$1
    local messages_per_conn=$(($MESSAGES_PER_SECOND / $CONCURRENT_CONNECTIONS))
    
    echo "Connection $connection_id: Sending $messages_per_conn messages/second"
    
    for ((i=1; i<=TEST_DURATION; i++)); do
        generate_test_data $messages_per_conn | nc $LOGSTASH_HOST $LOGSTASH_PORT
        sleep 1
    done
}

# 启动并发连接
for ((i=1; i<=CONCURRENT_CONNECTIONS; i++)); do
    send_test_data $i &
    pids[${i}]=$!
done

# 监控性能
monitor_performance() {
    while true; do
        echo "$(date): Checking Logstash performance..."
        
        # 获取管道统计
        curl -s "http://$LOGSTASH_HOST:9600/_node/stats/pipelines" | jq -r '
            .pipelines | to_entries[] | 
            "Pipeline: \(.key)" +
            "\n  Events In: \(.value.events.in)" +
            "\n  Events Out: \(.value.events.out)" +
            "\n  Events/sec: \(.value.events.in / (.value.events.duration_in_millis / 1000))" +
            "\n  Queue Push Duration: \(.value.events.queue_push_duration_in_millis)ms\n"
        '
        
        # 获取JVM统计
        curl -s "http://$LOGSTASH_HOST:9600/_node/stats/jvm" | jq -r '
            "JVM Stats:" +
            "\n  Heap Used: \(.jvm.mem.heap_used_percent)%" +
            "\n  Heap Max: \(.jvm.mem.heap_max_in_bytes / 1024 / 1024 | floor)MB" +
            "\n  GC Count: \(.jvm.gc.collectors.old.collection_count)" +
            "\n  GC Time: \(.jvm.gc.collectors.old.collection_time_in_millis)ms\n"
        '
        
        sleep 10
    done
}

# 启动性能监控
monitor_performance &
monitor_pid=$!

# 等待测试完成
for pid in ${pids[*]}; do
    wait $pid
done

# 停止监控
kill $monitor_pid

echo "Performance test completed!"

2. 监控仪表板

Grafana仪表板配置:

{
  "dashboard": {
    "title": "Logstash Performance Dashboard",
    "panels": [
      {
        "title": "Events Throughput",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(logstash_pipeline_events_in_total[5m])",
            "legendFormat": "Events In/sec - {{pipeline}}"
          },
          {
            "expr": "rate(logstash_pipeline_events_out_total[5m])",
            "legendFormat": "Events Out/sec - {{pipeline}}"
          }
        ]
      },
      {
        "title": "Pipeline Duration",
        "type": "graph",
        "targets": [
          {
            "expr": "logstash_pipeline_events_duration_seconds",
            "legendFormat": "Duration - {{pipeline}}"
          }
        ]
      },
      {
        "title": "JVM Memory Usage",
        "type": "graph",
        "targets": [
          {
            "expr": "logstash_jvm_memory_heap_used_bytes / logstash_jvm_memory_heap_max_bytes * 100",
            "legendFormat": "Heap Usage %"
          }
        ]
      },
      {
        "title": "Queue Size",
        "type": "graph",
        "targets": [
          {
            "expr": "logstash_pipeline_queue_events",
            "legendFormat": "Queue Events - {{pipeline}}"
          }
        ]
      }
    ]
  }
}

3. 自动化调优脚本

#!/bin/bash
# auto-tune.sh

LOGSTASH_CONFIG="/etc/logstash/logstash.yml"
LOGSTASH_API="http://localhost:9600"
OPTIMIZATION_LOG="/var/log/logstash-optimization.log"

log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a "$OPTIMIZATION_LOG"
}

# 获取当前性能指标
get_performance_metrics() {
    local stats=$(curl -s "$LOGSTASH_API/_node/stats")
    
    # 提取关键指标
    events_in_rate=$(echo "$stats" | jq -r '.pipelines | to_entries[0].value.events.in')
    events_out_rate=$(echo "$stats" | jq -r '.pipelines | to_entries[0].value.events.out')
    heap_usage=$(echo "$stats" | jq -r '.jvm.mem.heap_used_percent')
    gc_time=$(echo "$stats" | jq -r '.jvm.gc.collectors.old.collection_time_in_millis')
    
    echo "$events_in_rate,$events_out_rate,$heap_usage,$gc_time"
}

# 调整管道工作线程
adjust_pipeline_workers() {
    local current_workers=$(grep "pipeline.workers" "$LOGSTASH_CONFIG" | awk '{print $2}')
    local cpu_cores=$(nproc)
    local recommended_workers=$((cpu_cores * 2))
    
    if [ "$current_workers" -ne "$recommended_workers" ]; then
        log "Adjusting pipeline workers from $current_workers to $recommended_workers"
        sed -i "s/pipeline.workers: .*/pipeline.workers: $recommended_workers/" "$LOGSTASH_CONFIG"
        return 1  # 需要重启
    fi
    
    return 0
}

# 调整批处理大小
adjust_batch_size() {
    local metrics=$(get_performance_metrics)
    local heap_usage=$(echo "$metrics" | cut -d',' -f3)
    local current_batch_size=$(grep "pipeline.batch.size" "$LOGSTASH_CONFIG" | awk '{print $2}')
    
    if [ "$heap_usage" -gt 80 ]; then
        # 内存使用率高,减少批处理大小
        local new_batch_size=$((current_batch_size * 80 / 100))
        log "High memory usage ($heap_usage%), reducing batch size to $new_batch_size"
        sed -i "s/pipeline.batch.size: .*/pipeline.batch.size: $new_batch_size/" "$LOGSTASH_CONFIG"
        return 1
    elif [ "$heap_usage" -lt 50 ]; then
        # 内存使用率低,增加批处理大小
        local new_batch_size=$((current_batch_size * 120 / 100))
        log "Low memory usage ($heap_usage%), increasing batch size to $new_batch_size"
        sed -i "s/pipeline.batch.size: .*/pipeline.batch.size: $new_batch_size/" "$LOGSTASH_CONFIG"
        return 1
    fi
    
    return 0
}

# 主优化循环
main() {
    log "Starting Logstash auto-tuning..."
    
    local restart_needed=0
    
    # 调整工作线程
    if adjust_pipeline_workers; then
        restart_needed=1
    fi
    
    # 调整批处理大小
    if adjust_batch_size; then
        restart_needed=1
    fi
    
    # 如果需要重启
    if [ "$restart_needed" -eq 1 ]; then
        log "Configuration changed, restarting Logstash..."
        systemctl restart logstash
        
        # 等待重启完成
        sleep 30
        
        # 验证重启后的性能
        local new_metrics=$(get_performance_metrics)
        log "Post-restart metrics: $new_metrics"
    else
        log "No optimization needed at this time"
    fi
}

# 运行优化
main

故障排除与调试

1. 常见问题诊断

诊断脚本:

#!/bin/bash
# logstash-diagnostics.sh

LOGSTASH_HOME="/usr/share/logstash"
LOGSTASH_CONFIG="/etc/logstash"
LOGSTASH_LOGS="/var/log/logstash"
LOGSTASH_API="http://localhost:9600"

echo "Logstash Diagnostics Report"
echo "==========================="
echo "Generated: $(date)"
echo

# 1. 检查Logstash服务状态
echo "1. Service Status:"
systemctl status logstash --no-pager
echo

# 2. 检查端口监听
echo "2. Port Listening:"
netstat -tlnp | grep -E ':(5044|9600)'
echo

# 3. 检查配置文件语法
echo "3. Configuration Validation:"
$LOGSTASH_HOME/bin/logstash --config.test_and_exit --path.config="$LOGSTASH_CONFIG/conf.d"
echo

# 4. 检查API响应
echo "4. API Health Check:"
curl -s "$LOGSTASH_API" | jq -r '.status' 2>/dev/null || echo "API not responding"
echo

# 5. 检查管道状态
echo "5. Pipeline Status:"
curl -s "$LOGSTASH_API/_node/stats/pipelines" | jq -r '
    .pipelines | to_entries[] | 
    "Pipeline: \(.key)" +
    "\n  Status: \(.value.events.in > 0 and "active" or "inactive")" +
    "\n  Events In: \(.value.events.in)" +
    "\n  Events Out: \(.value.events.out)" +
    "\n  Failures: \(.value.events.filtered)\n"
' 2>/dev/null || echo "Unable to get pipeline stats"
echo

# 6. 检查JVM状态
echo "6. JVM Status:"
curl -s "$LOGSTASH_API/_node/stats/jvm" | jq -r '
    "Heap Used: \(.jvm.mem.heap_used_percent)%" +
    "\nHeap Max: \(.jvm.mem.heap_max_in_bytes / 1024 / 1024 | floor)MB" +
    "\nGC Count: \(.jvm.gc.collectors.old.collection_count)" +
    "\nUptime: \(.jvm.uptime_in_millis / 1000 / 60 | floor) minutes"
' 2>/dev/null || echo "Unable to get JVM stats"
echo

# 7. 检查最近的错误日志
echo "7. Recent Error Logs:"
if [ -f "$LOGSTASH_LOGS/logstash-plain.log" ]; then
    tail -20 "$LOGSTASH_LOGS/logstash-plain.log" | grep -i error
else
    echo "Log file not found"
fi
echo

# 8. 检查磁盘空间
echo "8. Disk Space:"
df -h | grep -E '(Filesystem|/var|/tmp|/usr)'
echo

# 9. 检查内存使用
echo "9. Memory Usage:"
free -h
echo

# 10. 检查CPU负载
echo "10. CPU Load:"
uptime
echo

echo "Diagnostics completed."

2. 性能问题排查

#!/bin/bash
# performance-troubleshoot.sh

LOGSTASH_API="http://localhost:9600"

# 检查热线程
check_hot_threads() {
    echo "Hot Threads Analysis:"
    echo "===================="
    curl -s "$LOGSTASH_API/_node/hot_threads?threads=10" | head -50
    echo
}

# 检查管道瓶颈
check_pipeline_bottlenecks() {
    echo "Pipeline Bottleneck Analysis:"
    echo "============================"
    
    local stats=$(curl -s "$LOGSTASH_API/_node/stats/pipelines")
    
    echo "$stats" | jq -r '
        .pipelines | to_entries[] | 
        "Pipeline: \(.key)" +
        "\n  Input Queue: \(.value.queue.events)" +
        "\n  Filter Duration: \(.value.events.duration_in_millis)ms" +
        "\n  Output Duration: \(.value.events.queue_push_duration_in_millis)ms" +
        "\n  Throughput: \(.value.events.in / (.value.events.duration_in_millis / 1000) | floor) events/sec\n"
    '
}

# 检查内存泄漏
check_memory_leaks() {
    echo "Memory Leak Detection:"
    echo "====================="
    
    # 连续监控内存使用
    for i in {1..5}; do
        local heap_usage=$(curl -s "$LOGSTASH_API/_node/stats/jvm" | jq -r '.jvm.mem.heap_used_percent')
        echo "Sample $i: Heap usage $heap_usage%"
        sleep 10
    done
}

# 检查队列积压
check_queue_backlog() {
    echo "Queue Backlog Analysis:"
    echo "======================"
    
    local queue_stats=$(curl -s "$LOGSTASH_API/_node/stats/pipelines")
    
    echo "$queue_stats" | jq -r '
        .pipelines | to_entries[] | 
        "Pipeline: \(.key)" +
        "\n  Queue Events: \(.value.queue.events)" +
        "\n  Queue Capacity: \(.value.queue.capacity.max_queue_size_in_bytes)" +
        "\n  Queue Usage: \(.value.queue.capacity.queue_size_in_bytes / .value.queue.capacity.max_queue_size_in_bytes * 100 | floor)%\n"
    '
}

# 主函数
main() {
    echo "Logstash Performance Troubleshooting"
    echo "===================================="
    echo "Timestamp: $(date)"
    echo
    
    check_hot_threads
    check_pipeline_bottlenecks
    check_memory_leaks
    check_queue_backlog
}

main

3. 调试配置

# debug.conf - 调试配置
input {
  # 调试输入
  generator {
    message => '{"timestamp": "%{+ISO8601}", "level": "INFO", "message": "Debug test message %{sequence}"}'
    count => 100
    codec => json
  }
}

filter {
  # 添加调试信息
  mutate {
    add_field => {
      "[@metadata][debug]" => "true"
      "[@metadata][pipeline_start]" => "%{+ISO8601}"
    }
  }
  
  # 记录处理时间
  ruby {
    code => "
      event.set('[@metadata][filter_start]', Time.now.to_f)
    "
  }
  
  # 模拟处理逻辑
  if [level] == "INFO" {
    mutate {
      add_tag => ["info_processed"]
    }
  }
  
  # 记录处理完成时间
  ruby {
    code => "
      start_time = event.get('[@metadata][filter_start]')
      if start_time
        processing_time = (Time.now.to_f - start_time) * 1000
        event.set('[@metadata][processing_time_ms]', processing_time)
      end
    "
  }
}

output {
  # 调试输出到文件
  file {
    path => "/var/log/logstash/debug-%{+YYYY.MM.dd}.log"
    codec => json_lines
  }
  
  # 调试输出到控制台
  stdout {
    codec => rubydebug {
      metadata => true
    }
  }
  
  # 性能统计输出
  if [@metadata][processing_time_ms] {
    if [@metadata][processing_time_ms] > 100 {
      file {
        path => "/var/log/logstash/slow-processing.log"
        codec => line {
          format => "Slow processing detected: %{[@metadata][processing_time_ms]}ms - %{message}"
        }
      }
    }
  }
}

安全配置

1. SSL/TLS配置

# secure-input.conf
input {
  beats {
    port => 5044
    ssl => true
    ssl_certificate => "/etc/logstash/certs/logstash.crt"
    ssl_key => "/etc/logstash/certs/logstash.key"
    ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
    ssl_verify_mode => "force_peer"
    ssl_peer_metadata => true
  }
}

filter {
  # 添加SSL客户端信息
  if [@metadata][tls_peer] {
    mutate {
      add_field => {
        "client_cert_subject" => "%{[@metadata][tls_peer][subject]}"
        "client_cert_issuer" => "%{[@metadata][tls_peer][issuer]}"
      }
    }
  }
}

output {
  elasticsearch {
    hosts => ["https://elasticsearch:9200"]
    ssl => true
    ssl_certificate_verification => true
    ssl_certificate => "/etc/logstash/certs/client.crt"
    ssl_key => "/etc/logstash/certs/client.key"
    cacert => "/etc/logstash/certs/ca.crt"
    
    user => "logstash_writer"
    password => "${LOGSTASH_PASSWORD}"
    
    index => "secure-logs-%{+YYYY.MM.dd}"
  }
}

2. 访问控制

# logstash.yml - 安全配置
api.http.host: "127.0.0.1"  # 限制API访问
api.http.port: 9600
api.auth.type: basic
api.auth.basic.username: "admin"
api.auth.basic.password: "${API_PASSWORD}"

# 启用审计日志
log.level: info
path.logs: /var/log/logstash
logging.json: true

# 安全相关设置
config.support_escapes: false  # 防止配置注入
config.reload.automatic: false  # 禁用自动重载

3. 数据脱敏

# data-masking.conf
filter {
  # 脱敏信用卡号
  if [message] =~ /\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}/ {
    mutate {
      gsub => [
        "message", "\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}", "****-****-****-****"
      ]
      add_tag => ["credit_card_masked"]
    }
  }
  
  # 脱敏邮箱地址
  if [message] =~ /[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}/ {
    mutate {
      gsub => [
        "message", "[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", "***@***.***"
      ]
      add_tag => ["email_masked"]
    }
  }
  
  # 脱敏IP地址
  if [client_ip] {
    ruby {
      code => "
        ip = event.get('client_ip')
        if ip
          parts = ip.split('.')
          if parts.length == 4
            masked_ip = parts[0] + '.' + parts[1] + '.***.' + parts[3]
            event.set('client_ip_masked', masked_ip)
            event.remove('client_ip')
          end
        end
      "
    }
  }
  
  # 脱敏敏感字段
  if [password] {
    mutate {
      replace => { "password" => "[REDACTED]" }
      add_tag => ["password_redacted"]
    }
  }
  
  if [ssn] {
    mutate {
      replace => { "ssn" => "***-**-****" }
      add_tag => ["ssn_redacted"]
    }
  }
}

实战案例

1. 大规模日志处理架构

graph TB
    subgraph "数据源层"
        A1[Web服务器集群]
        A2[应用服务器集群]
        A3[数据库服务器]
        A4[安全设备]
    end
    
    subgraph "收集层"
        B1[Filebeat集群]
        B2[Metricbeat集群]
        B3[Packetbeat集群]
    end
    
    subgraph "消息队列层"
        C1[Kafka集群]
        C2[Redis集群]
    end
    
    subgraph "处理层"
        D1[Logstash集群 - Web日志]
        D2[Logstash集群 - 应用日志]
        D3[Logstash集群 - 安全日志]
    end
    
    subgraph "存储层"
        E1[Elasticsearch热节点]
        E2[Elasticsearch温节点]
        E3[Elasticsearch冷节点]
    end
    
    A1 --> B1
    A2 --> B1
    A3 --> B2
    A4 --> B3
    
    B1 --> C1
    B2 --> C1
    B3 --> C2
    
    C1 --> D1
    C1 --> D2
    C2 --> D3
    
    D1 --> E1
    D2 --> E1
    D3 --> E1
    
    E1 --> E2
    E2 --> E3

2. 实时告警系统

# real-time-alerting.conf
input {
  kafka {
    bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
    topics => ["application-logs", "security-logs"]
    group_id => "logstash-alerting"
    consumer_threads => 4
  }
}

filter {
  # 解析JSON日志
  json {
    source => "message"
  }
  
  # 错误检测和告警
  if [level] == "ERROR" or [status_code] >= 500 {
    mutate {
      add_field => {
        "alert_type" => "error"
        "alert_severity" => "high"
        "alert_timestamp" => "%{@timestamp}"
      }
      add_tag => ["alert_required"]
    }
  }
  
  # 安全事件检测
  if [event_type] == "security" {
    if [action] == "login_failed" {
      # 检测暴力破解
      aggregate {
        task_id => "%{client_ip}"
        code => "
          map['failed_attempts'] ||= 0
          map['failed_attempts'] += 1
          map['last_attempt'] = event.get('@timestamp')
          
          if map['failed_attempts'] >= 5
            event.set('alert_type', 'brute_force')
            event.set('alert_severity', 'critical')
            event.tag('security_alert')
          end
        "
        push_previous_map_as_event => true
        timeout => 300
      }
    }
  }
  
  # 性能异常检测
  if [response_time] {
    if [response_time] > 5000 {
      mutate {
        add_field => {
          "alert_type" => "performance"
          "alert_severity" => "medium"
        }
        add_tag => ["performance_alert"]
      }
    }
  }
}

output {
  # 正常日志存储
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "logs-%{+YYYY.MM.dd}"
  }
  
  # 告警输出
  if "alert_required" in [tags] or "security_alert" in [tags] or "performance_alert" in [tags] {
    # 发送到告警队列
    kafka {
      bootstrap_servers => "kafka:9092"
      topic_id => "alerts"
      codec => json
    }
    
    # 发送邮件告警
    email {
      to => "admin@example.com"
      subject => "Logstash Alert: %{alert_type}"
      body => "Alert Details:\nType: %{alert_type}\nSeverity: %{alert_severity}\nTimestamp: %{@timestamp}\nMessage: %{message}"
      smtp_server => "smtp.example.com"
      smtp_port => 587
      username => "alerts@example.com"
      password => "${SMTP_PASSWORD}"
    }
    
    # 发送到Slack
    http {
      url => "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
      http_method => "post"
      format => "json"
      mapping => {
        "text" => "🚨 Alert: %{alert_type} - %{alert_severity}"
        "attachments" => [{
          "color" => "danger"
          "fields" => [{
            "title" => "Details"
            "value" => "%{message}"
            "short" => false
          }]
        }]
      }
    }
  }
}

3. 日志聚合分析

# log-aggregation.conf
input {
  beats {
    port => 5044
  }
}

filter {
  # 基础解析
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  }
  
  # 时间窗口聚合
  aggregate {
    task_id => "hourly_stats_%{+YYYY.MM.dd.HH}"
    code => "
      map['total_requests'] ||= 0
      map['total_requests'] += 1
      
      map['status_codes'] ||= {}
      status = event.get('response')
      map['status_codes'][status] ||= 0
      map['status_codes'][status] += 1
      
      map['bytes_sent'] ||= 0
      bytes = event.get('bytes').to_i
      map['bytes_sent'] += bytes
      
      map['unique_ips'] ||= Set.new
      map['unique_ips'].add(event.get('clientip'))
      
      # 计算响应时间统计
      response_time = event.get('response_time')
      if response_time
        map['response_times'] ||= []
        map['response_times'] << response_time.to_f
      end
    "
    push_previous_map_as_event => true
    timeout => 3600
    timeout_tags => ['aggregated_hourly']
  }
  
  # 处理聚合结果
  if "aggregated_hourly" in [tags] {
    ruby {
      code => "
        # 计算响应时间统计
        response_times = event.get('response_times') || []
        if response_times.length > 0
          sorted_times = response_times.sort
          event.set('avg_response_time', response_times.sum / response_times.length)
          event.set('min_response_time', sorted_times.first)
          event.set('max_response_time', sorted_times.last)
          event.set('p95_response_time', sorted_times[(sorted_times.length * 0.95).to_i])
          event.set('p99_response_time', sorted_times[(sorted_times.length * 0.99).to_i])
        end
        
        # 设置唯一IP数量
        unique_ips = event.get('unique_ips')
        if unique_ips
          event.set('unique_ip_count', unique_ips.length)
          event.remove('unique_ips')
        end
        
        # 移除原始响应时间数组
        event.remove('response_times')
      "
    }
    
    mutate {
      add_field => {
        "[@metadata][index]" => "logstash-stats-%{+YYYY.MM.dd}"
        "document_type" => "hourly_aggregation"
      }
    }
  }
}

output {
  if "aggregated_hourly" in [tags] {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "%{[@metadata][index]}"
    }
  } else {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "logstash-raw-%{+YYYY.MM.dd}"
    }
  }
}

4. 多租户架构

# multi-tenant.conf
input {
  beats {
    port => 5044
  }
}

filter {
  # 租户识别
  if [fields][tenant_id] {
    mutate {
      add_field => { "tenant_id" => "%{[fields][tenant_id]}" }
    }
  } else if [beat][hostname] {
    # 根据主机名推断租户
    if [beat][hostname] =~ /^tenant1-/ {
      mutate { add_field => { "tenant_id" => "tenant1" } }
    } else if [beat][hostname] =~ /^tenant2-/ {
      mutate { add_field => { "tenant_id" => "tenant2" } }
    } else {
      mutate { add_field => { "tenant_id" => "default" } }
    }
  }
  
  # 租户特定处理
  if [tenant_id] == "tenant1" {
    # 租户1的特殊处理逻辑
    grok {
      match => { "message" => "%{TENANT1_LOG_PATTERN}" }
      patterns_dir => ["/etc/logstash/patterns/tenant1"]
    }
    
    mutate {
      add_field => {
        "[@metadata][index_prefix]" => "tenant1-logs"
        "[@metadata][retention_days]" => "30"
      }
    }
  } else if [tenant_id] == "tenant2" {
    # 租户2的特殊处理逻辑
    json {
      source => "message"
    }
    
    mutate {
      add_field => {
        "[@metadata][index_prefix]" => "tenant2-logs"
        "[@metadata][retention_days]" => "90"
      }
    }
  } else {
    # 默认处理
    mutate {
      add_field => {
        "[@metadata][index_prefix]" => "default-logs"
        "[@metadata][retention_days]" => "7"
      }
    }
  }
  
  # 数据隔离和安全
  mutate {
    add_field => {
      "[@metadata][routing]" => "%{tenant_id}"
    }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "%{[@metadata][index_prefix]}-%{+YYYY.MM.dd}"
    routing => "%{[@metadata][routing]}"
    
    # 租户特定的模板
    template_name => "%{tenant_id}-template"
    template_pattern => "%{[@metadata][index_prefix]}-*"
    template => "/etc/logstash/templates/%{tenant_id}-template.json"
  }
}

最佳实践

1. 配置管理最佳实践

配置版本控制:

#!/bin/bash
# config-management.sh

CONFIG_REPO="/etc/logstash/config-repo"
CONFIG_DIR="/etc/logstash/conf.d"
BACKUP_DIR="/etc/logstash/backups"

# 初始化配置仓库
init_config_repo() {
    if [ ! -d "$CONFIG_REPO" ]; then
        git init "$CONFIG_REPO"
        cd "$CONFIG_REPO"
        git config user.name "Logstash Config Manager"
        git config user.email "logstash@example.com"
    fi
}

# 备份当前配置
backup_config() {
    local timestamp=$(date +%Y%m%d_%H%M%S)
    local backup_path="$BACKUP_DIR/config_$timestamp"
    
    mkdir -p "$backup_path"
    cp -r "$CONFIG_DIR"/* "$backup_path/"
    
    echo "Configuration backed up to: $backup_path"
}

# 部署配置
deploy_config() {
    local version=$1
    
    if [ -z "$version" ]; then
        echo "Usage: deploy_config <version>"
        return 1
    fi
    
    # 备份当前配置
    backup_config
    
    # 检出指定版本
    cd "$CONFIG_REPO"
    git checkout "$version"
    
    # 复制配置文件
    cp -r * "$CONFIG_DIR/"
    
    # 验证配置
    if /usr/share/logstash/bin/logstash --config.test_and_exit --path.config="$CONFIG_DIR"; then
        echo "Configuration validation passed"
        systemctl reload logstash
        echo "Configuration deployed successfully"
    else
        echo "Configuration validation failed"
        # 回滚到备份
        rollback_config
        return 1
    fi
}

# 回滚配置
rollback_config() {
    local latest_backup=$(ls -t "$BACKUP_DIR" | head -1)
    
    if [ -n "$latest_backup" ]; then
        cp -r "$BACKUP_DIR/$latest_backup"/* "$CONFIG_DIR/"
        systemctl reload logstash
        echo "Configuration rolled back to: $latest_backup"
    else
        echo "No backup found for rollback"
    fi
}

# 主函数
case "$1" in
    init)
        init_config_repo
        ;;
    backup)
        backup_config
        ;;
    deploy)
        deploy_config "$2"
        ;;
    rollback)
        rollback_config
        ;;
    *)
        echo "Usage: $0 {init|backup|deploy <version>|rollback}"
        exit 1
        ;;
esac

2. 监控和告警最佳实践

# monitoring-rules.yml
groups:
- name: logstash.rules
  rules:
  # 管道吞吐量告警
  - alert: LogstashLowThroughput
    expr: rate(logstash_pipeline_events_in_total[5m]) < 100
    for: 2m
    labels:
      severity: warning
    annotations:
      summary: "Logstash pipeline throughput is low"
      description: "Pipeline {{ $labels.pipeline }} throughput is {{ $value }} events/sec"
  
  # 内存使用告警
  - alert: LogstashHighMemoryUsage
    expr: logstash_jvm_memory_heap_used_bytes / logstash_jvm_memory_heap_max_bytes > 0.8
    for: 5m
    labels:
      severity: warning
    annotations:
      summary: "Logstash memory usage is high"
      description: "Memory usage is {{ $value | humanizePercentage }}"
  
  # 队列积压告警
  - alert: LogstashQueueBacklog
    expr: logstash_pipeline_queue_events > 10000
    for: 1m
    labels:
      severity: critical
    annotations:
      summary: "Logstash queue has significant backlog"
      description: "Queue has {{ $value }} events pending"
  
  # GC时间告警
  - alert: LogstashHighGCTime
    expr: rate(logstash_jvm_gc_collection_time_seconds[5m]) > 0.1
    for: 2m
    labels:
      severity: warning
    annotations:
      summary: "Logstash GC time is high"
      description: "GC time is {{ $value | humanizeDuration }} per second"

3. 性能优化最佳实践

性能调优检查清单:

## Logstash性能优化检查清单

### 硬件资源
- [ ] CPU核心数 >= 管道工作线程数
- [ ] 内存 >= 4GB(推荐8GB+)
- [ ] 使用SSD存储(特别是持久化队列)
- [ ] 网络带宽充足

### JVM配置
- [ ] 堆内存设置为物理内存的50-75%
- [ ] 使用G1GC垃圾收集器
- [ ] 设置合适的GC参数
- [ ] 启用JVM性能监控

### 管道配置
- [ ] 工作线程数 = CPU核心数
- [ ] 批处理大小根据内存调整(125-1000)
- [ ] 批处理延迟适中(5-50ms)
- [ ] 使用持久化队列处理高吞吐量

### 过滤器优化
- [ ] 避免复杂的正则表达式
- [ ] 使用条件语句减少不必要的处理
- [ ] 合理使用Ruby代码块
- [ ] 避免在热路径中使用聚合

### 输出优化
- [ ] 使用批量输出
- [ ] 配置合适的刷新间隔
- [ ] 使用连接池
- [ ] 启用压缩(如适用)

### 监控指标
- [ ] 监控事件吞吐量
- [ ] 监控内存使用率
- [ ] 监控GC频率和时间
- [ ] 监控队列大小
- [ ] 监控错误率

小结

本章深入介绍了Logstash的高级特性和最佳实践,包括:

核心要点

  1. 架构理解:深入理解Logstash的内部架构和事件处理流程
  2. 高级配置:掌握多管道、模板化配置和动态重载
  3. 插件开发:学会开发自定义插件扩展功能
  4. 集群部署:实现高可用和可扩展的集群架构
  5. 性能调优:系统性的性能优化方法和监控
  6. 故障排除:完整的诊断和调试工具链
  7. 安全配置:全面的安全防护措施
  8. 实战案例:真实场景的解决方案

最佳实践总结

  1. 配置管理:使用版本控制和自动化部署
  2. 监控告警:建立完善的监控和告警体系
  3. 性能优化:遵循性能调优检查清单
  4. 安全防护:实施多层次的安全措施
  5. 运维自动化:使用脚本和工具提高效率

扩展学习


下一章预告:第5章将介绍Elasticsearch深入应用,包括索引管理、查询优化、集群管理等高级主题。