概述

在生产环境中运行ELK Stack时,经常会遇到各种性能问题和故障。本章将详细介绍常见问题的诊断方法、解决方案以及性能优化策略,帮助您构建稳定、高效的日志分析系统。

常见故障诊断

1. Elasticsearch集群故障

集群状态诊断

健康状态检查:

#!/bin/bash
# elasticsearch-health-check.sh

ES_HOST="http://localhost:9200"
ES_USER="elastic"
ES_PASS="your_password"

# 检查集群健康状态
check_cluster_health() {
    echo "=== Cluster Health ==="
    curl -s -u "${ES_USER}:${ES_PASS}" "${ES_HOST}/_cluster/health?pretty" | jq .
    echo
}

# 检查节点状态
check_nodes() {
    echo "=== Node Status ==="
    curl -s -u "${ES_USER}:${ES_PASS}" "${ES_HOST}/_cat/nodes?v&h=name,heap.percent,ram.percent,cpu,load_1m,disk.used_percent,node.role,master"
    echo
}

# 检查分片状态
check_shards() {
    echo "=== Shard Status ==="
    curl -s -u "${ES_USER}:${ES_PASS}" "${ES_HOST}/_cat/shards?v&h=index,shard,prirep,state,docs,store,node" | head -20
    echo
}

# 检查未分配分片
check_unassigned_shards() {
    echo "=== Unassigned Shards ==="
    curl -s -u "${ES_USER}:${ES_PASS}" "${ES_HOST}/_cat/shards?v&h=index,shard,prirep,state,unassigned.reason" | grep UNASSIGNED
    echo
}

# 检查集群设置
check_cluster_settings() {
    echo "=== Cluster Settings ==="
    curl -s -u "${ES_USER}:${ES_PASS}" "${ES_HOST}/_cluster/settings?pretty&include_defaults=true" | jq '.persistent, .transient'
    echo
}

# 检查待处理任务
check_pending_tasks() {
    echo "=== Pending Tasks ==="
    curl -s -u "${ES_USER}:${ES_PASS}" "${ES_HOST}/_cluster/pending_tasks?pretty" | jq .
    echo
}

# 主函数
main() {
    echo "Elasticsearch Cluster Health Check - $(date)"
    echo "================================================"
    
    check_cluster_health
    check_nodes
    check_shards
    check_unassigned_shards
    check_cluster_settings
    check_pending_tasks
}

main

分片分配问题

分片分配诊断:

# 查看分片分配解释
curl -X GET "localhost:9200/_cluster/allocation/explain?pretty" \
  -H 'Content-Type: application/json' \
  -d '{
    "index": "your-index-name",
    "shard": 0,
    "primary": true
  }'

# 手动分配分片
curl -X POST "localhost:9200/_cluster/reroute?pretty" \
  -H 'Content-Type: application/json' \
  -d '{
    "commands": [
      {
        "allocate_replica": {
          "index": "your-index-name",
          "shard": 0,
          "node": "node-name"
        }
      }
    ]
  }'

# 取消分片分配
curl -X POST "localhost:9200/_cluster/reroute?pretty" \
  -H 'Content-Type: application/json' \
  -d '{
    "commands": [
      {
        "cancel": {
          "index": "your-index-name",
          "shard": 0,
          "node": "node-name",
          "allow_primary": false
        }
      }
    ]
  }'

分片分配设置优化:

{
  "persistent": {
    "cluster.routing.allocation.enable": "all",
    "cluster.routing.allocation.node_concurrent_recoveries": 2,
    "cluster.routing.allocation.cluster_concurrent_rebalance": 2,
    "cluster.routing.allocation.awareness.attributes": "rack_id",
    "cluster.routing.allocation.balance.shard": 0.45,
    "cluster.routing.allocation.balance.index": 0.55,
    "cluster.routing.allocation.balance.threshold": 1.0,
    "indices.recovery.max_bytes_per_sec": "100mb",
    "indices.recovery.concurrent_streams": 3
  }
}

2. 内存和性能问题

JVM堆内存分析

堆内存监控脚本:

# elasticsearch_memory_monitor.py
import requests
import json
import time
from datetime import datetime
import matplotlib.pyplot as plt
import pandas as pd

class ESMemoryMonitor:
    def __init__(self, es_host, username, password):
        self.es_host = es_host
        self.auth = (username, password)
        self.data = []
    
    def get_node_stats(self):
        """获取节点统计信息"""
        try:
            response = requests.get(
                f"{self.es_host}/_nodes/stats/jvm,os,fs",
                auth=self.auth,
                timeout=10
            )
            return response.json()
        except Exception as e:
            print(f"Error getting node stats: {e}")
            return None
    
    def analyze_memory_usage(self, stats):
        """分析内存使用情况"""
        analysis = {
            'timestamp': datetime.now(),
            'nodes': {}
        }
        
        for node_id, node_stats in stats['nodes'].items():
            node_name = node_stats['name']
            jvm = node_stats['jvm']
            os_stats = node_stats['os']
            
            node_analysis = {
                'name': node_name,
                'heap_used_percent': jvm['mem']['heap_used_percent'],
                'heap_used_bytes': jvm['mem']['heap_used_in_bytes'],
                'heap_max_bytes': jvm['mem']['heap_max_in_bytes'],
                'non_heap_used_bytes': jvm['mem']['non_heap_used_in_bytes'],
                'gc_young_collection_count': jvm['gc']['collectors']['young']['collection_count'],
                'gc_young_collection_time': jvm['gc']['collectors']['young']['collection_time_in_millis'],
                'gc_old_collection_count': jvm['gc']['collectors']['old']['collection_count'],
                'gc_old_collection_time': jvm['gc']['collectors']['old']['collection_time_in_millis'],
                'os_mem_used_percent': os_stats['mem']['used_percent'],
                'os_cpu_percent': os_stats['cpu']['percent']
            }
            
            analysis['nodes'][node_id] = node_analysis
        
        return analysis
    
    def detect_memory_issues(self, analysis):
        """检测内存问题"""
        issues = []
        
        for node_id, node_data in analysis['nodes'].items():
            node_name = node_data['name']
            
            # 检查堆内存使用率
            if node_data['heap_used_percent'] > 85:
                issues.append({
                    'severity': 'critical',
                    'node': node_name,
                    'issue': f"High heap usage: {node_data['heap_used_percent']}%",
                    'recommendation': "Consider increasing heap size or reducing memory usage"
                })
            
            # 检查GC频率
            if len(self.data) > 1:
                prev_data = self.data[-1]['nodes'].get(node_id)
                if prev_data:
                    gc_young_rate = (node_data['gc_young_collection_count'] - 
                                   prev_data['gc_young_collection_count']) / 60  # per minute
                    gc_old_rate = (node_data['gc_old_collection_count'] - 
                                 prev_data['gc_old_collection_count']) / 60
                    
                    if gc_young_rate > 10:
                        issues.append({
                            'severity': 'warning',
                            'node': node_name,
                            'issue': f"High young GC rate: {gc_young_rate:.2f}/min",
                            'recommendation': "Consider tuning GC settings or reducing allocation rate"
                        })
                    
                    if gc_old_rate > 1:
                        issues.append({
                            'severity': 'critical',
                            'node': node_name,
                            'issue': f"High old GC rate: {gc_old_rate:.2f}/min",
                            'recommendation': "Urgent: Review memory usage and heap size"
                        })
            
            # 检查系统内存
            if node_data['os_mem_used_percent'] > 90:
                issues.append({
                    'severity': 'warning',
                    'node': node_name,
                    'issue': f"High system memory usage: {node_data['os_mem_used_percent']}%",
                    'recommendation': "Monitor system processes and consider adding more RAM"
                })
        
        return issues
    
    def generate_memory_report(self):
        """生成内存使用报告"""
        if not self.data:
            return "No data available"
        
        report = "\n=== Elasticsearch Memory Usage Report ===\n"
        report += f"Report generated at: {datetime.now()}\n"
        report += f"Data points collected: {len(self.data)}\n\n"
        
        # 最新数据分析
        latest = self.data[-1]
        report += "Current Status:\n"
        for node_id, node_data in latest['nodes'].items():
            report += f"\nNode: {node_data['name']}\n"
            report += f"  Heap Usage: {node_data['heap_used_percent']}% ({node_data['heap_used_bytes'] / 1024**3:.2f}GB / {node_data['heap_max_bytes'] / 1024**3:.2f}GB)\n"
            report += f"  System Memory: {node_data['os_mem_used_percent']}%\n"
            report += f"  CPU Usage: {node_data['os_cpu_percent']}%\n"
        
        # 趋势分析
        if len(self.data) > 10:
            report += "\n\nTrend Analysis (last 10 data points):\n"
            recent_data = self.data[-10:]
            
            for node_id in latest['nodes'].keys():
                node_name = latest['nodes'][node_id]['name']
                heap_usage = [d['nodes'][node_id]['heap_used_percent'] for d in recent_data if node_id in d['nodes']]
                
                if heap_usage:
                    avg_heap = sum(heap_usage) / len(heap_usage)
                    max_heap = max(heap_usage)
                    min_heap = min(heap_usage)
                    
                    report += f"\nNode {node_name}:\n"
                    report += f"  Average Heap Usage: {avg_heap:.1f}%\n"
                    report += f"  Peak Heap Usage: {max_heap:.1f}%\n"
                    report += f"  Minimum Heap Usage: {min_heap:.1f}%\n"
        
        return report
    
    def monitor(self, duration_minutes=60, interval_seconds=60):
        """持续监控内存使用"""
        print(f"Starting memory monitoring for {duration_minutes} minutes...")
        
        end_time = time.time() + (duration_minutes * 60)
        
        while time.time() < end_time:
            stats = self.get_node_stats()
            if stats:
                analysis = self.analyze_memory_usage(stats)
                self.data.append(analysis)
                
                # 检测问题
                issues = self.detect_memory_issues(analysis)
                if issues:
                    print(f"\n[{datetime.now()}] Memory issues detected:")
                    for issue in issues:
                        print(f"  {issue['severity'].upper()}: {issue['node']} - {issue['issue']}")
                        print(f"    Recommendation: {issue['recommendation']}")
                
                # 显示当前状态
                print(f"[{datetime.now()}] Memory status:")
                for node_id, node_data in analysis['nodes'].items():
                    print(f"  {node_data['name']}: Heap {node_data['heap_used_percent']}%, System {node_data['os_mem_used_percent']}%")
            
            time.sleep(interval_seconds)
        
        # 生成最终报告
        print("\n" + self.generate_memory_report())

# 使用示例
if __name__ == "__main__":
    monitor = ESMemoryMonitor(
        es_host="http://localhost:9200",
        username="elastic",
        password="your_password"
    )
    
    # 监控1小时,每分钟采集一次数据
    monitor.monitor(duration_minutes=60, interval_seconds=60)

垃圾回收优化

JVM GC配置优化:

# jvm.options - G1GC配置
-Xms8g
-Xmx8g

# G1GC设置
-XX:+UseG1GC
-XX:G1HeapRegionSize=16m
-XX:G1NewSizePercent=30
-XX:G1MaxNewSizePercent=40
-XX:MaxGCPauseMillis=200
-XX:G1MixedGCCountTarget=8
-XX:InitiatingHeapOccupancyPercent=45

# GC日志
-Xlog:gc*,gc+heap=info,safepoint:gc.log:time,level,tags
-XX:+UnlockExperimentalVMOptions
-XX:+UseCGroupMemoryLimitForHeap

# 内存优化
-XX:+AlwaysPreTouch
-Xss1m
-Djava.awt.headless=true
-Dfile.encoding=UTF-8
-Djna.nosys=true
-XX:-OmitStackTraceInFastThrow
-Dio.netty.noUnsafe=true
-Dio.netty.noKeySetOptimization=true
-Dio.netty.recycler.maxCapacityPerThread=0
-Dlog4j.shutdownHookEnabled=false
-Dlog4j2.disable.jmx=true

# 诊断选项
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/lib/elasticsearch
-XX:ErrorFile=/var/log/elasticsearch/hs_err_pid%p.log

GC分析脚本:

# gc_analyzer.py
import re
import matplotlib.pyplot as plt
from datetime import datetime
import pandas as pd

class GCLogAnalyzer:
    def __init__(self, log_file):
        self.log_file = log_file
        self.gc_events = []
    
    def parse_gc_log(self):
        """解析GC日志"""
        gc_pattern = re.compile(
            r'\[(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3})\+\d{4}\].*?'
            r'GC\((\d+)\).*?'
            r'Pause (\w+).*?'
            r'(\d+\.\d+)ms'
        )
        
        with open(self.log_file, 'r') as f:
            for line in f:
                match = gc_pattern.search(line)
                if match:
                    timestamp_str, gc_id, gc_type, duration = match.groups()
                    timestamp = datetime.fromisoformat(timestamp_str.replace('T', ' '))
                    
                    self.gc_events.append({
                        'timestamp': timestamp,
                        'gc_id': int(gc_id),
                        'gc_type': gc_type,
                        'duration_ms': float(duration)
                    })
    
    def analyze_gc_performance(self):
        """分析GC性能"""
        if not self.gc_events:
            return "No GC events found"
        
        df = pd.DataFrame(self.gc_events)
        
        analysis = {
            'total_events': len(df),
            'total_gc_time': df['duration_ms'].sum(),
            'avg_gc_time': df['duration_ms'].mean(),
            'max_gc_time': df['duration_ms'].max(),
            'min_gc_time': df['duration_ms'].min(),
            'gc_types': df['gc_type'].value_counts().to_dict()
        }
        
        # 计算GC频率
        if len(df) > 1:
            time_span = (df['timestamp'].max() - df['timestamp'].min()).total_seconds()
            analysis['gc_frequency'] = len(df) / (time_span / 3600)  # per hour
        
        return analysis
    
    def generate_gc_report(self):
        """生成GC分析报告"""
        analysis = self.analyze_gc_performance()
        
        if isinstance(analysis, str):
            return analysis
        
        report = "\n=== GC Performance Analysis ===\n"
        report += f"Total GC Events: {analysis['total_events']}\n"
        report += f"Total GC Time: {analysis['total_gc_time']:.2f}ms\n"
        report += f"Average GC Time: {analysis['avg_gc_time']:.2f}ms\n"
        report += f"Max GC Time: {analysis['max_gc_time']:.2f}ms\n"
        report += f"Min GC Time: {analysis['min_gc_time']:.2f}ms\n"
        
        if 'gc_frequency' in analysis:
            report += f"GC Frequency: {analysis['gc_frequency']:.2f} events/hour\n"
        
        report += "\nGC Types:\n"
        for gc_type, count in analysis['gc_types'].items():
            report += f"  {gc_type}: {count} events\n"
        
        # 性能建议
        report += "\n=== Performance Recommendations ===\n"
        
        if analysis['avg_gc_time'] > 100:
            report += "⚠️  Average GC time is high (>100ms). Consider:\n"
            report += "   - Increasing heap size\n"
            report += "   - Tuning GC parameters\n"
            report += "   - Reducing allocation rate\n"
        
        if analysis['max_gc_time'] > 1000:
            report += "🚨 Maximum GC time is very high (>1s). Urgent action needed:\n"
            report += "   - Review heap sizing\n"
            report += "   - Check for memory leaks\n"
            report += "   - Consider different GC algorithm\n"
        
        if 'gc_frequency' in analysis and analysis['gc_frequency'] > 100:
            report += "⚠️  GC frequency is high (>100/hour). Consider:\n"
            report += "   - Optimizing application code\n"
            report += "   - Reducing object allocation\n"
            report += "   - Tuning young generation size\n"
        
        return report
    
    def plot_gc_timeline(self):
        """绘制GC时间线图"""
        if not self.gc_events:
            print("No GC events to plot")
            return
        
        df = pd.DataFrame(self.gc_events)
        
        plt.figure(figsize=(12, 8))
        
        # GC持续时间时间线
        plt.subplot(2, 1, 1)
        plt.plot(df['timestamp'], df['duration_ms'], 'b-', alpha=0.7)
        plt.scatter(df['timestamp'], df['duration_ms'], c='red', s=20, alpha=0.6)
        plt.title('GC Duration Timeline')
        plt.ylabel('Duration (ms)')
        plt.grid(True, alpha=0.3)
        
        # GC类型分布
        plt.subplot(2, 1, 2)
        gc_type_counts = df['gc_type'].value_counts()
        plt.bar(gc_type_counts.index, gc_type_counts.values)
        plt.title('GC Type Distribution')
        plt.ylabel('Count')
        plt.xticks(rotation=45)
        
        plt.tight_layout()
        plt.savefig('gc_analysis.png', dpi=300, bbox_inches='tight')
        plt.show()

# 使用示例
if __name__ == "__main__":
    analyzer = GCLogAnalyzer('/var/log/elasticsearch/gc.log')
    analyzer.parse_gc_log()
    print(analyzer.generate_gc_report())
    analyzer.plot_gc_timeline()

3. Logstash性能问题

管道性能监控

Logstash性能监控脚本:

# logstash_monitor.py
import requests
import json
import time
from datetime import datetime

class LogstashMonitor:
    def __init__(self, logstash_host="http://localhost:9600"):
        self.logstash_host = logstash_host
    
    def get_node_stats(self):
        """获取Logstash节点统计"""
        try:
            response = requests.get(f"{self.logstash_host}/_node/stats")
            return response.json()
        except Exception as e:
            print(f"Error getting node stats: {e}")
            return None
    
    def get_pipeline_stats(self):
        """获取管道统计"""
        try:
            response = requests.get(f"{self.logstash_host}/_node/stats/pipelines")
            return response.json()
        except Exception as e:
            print(f"Error getting pipeline stats: {e}")
            return None
    
    def analyze_pipeline_performance(self, stats):
        """分析管道性能"""
        if not stats or 'pipelines' not in stats:
            return None
        
        analysis = {}
        
        for pipeline_id, pipeline_stats in stats['pipelines'].items():
            events = pipeline_stats.get('events', {})
            
            pipeline_analysis = {
                'events_in': events.get('in', 0),
                'events_out': events.get('out', 0),
                'events_filtered': events.get('filtered', 0),
                'duration_in_millis': events.get('duration_in_millis', 0),
                'queue_push_duration_in_millis': events.get('queue_push_duration_in_millis', 0)
            }
            
            # 计算吞吐量
            if pipeline_analysis['duration_in_millis'] > 0:
                pipeline_analysis['throughput_events_per_second'] = (
                    pipeline_analysis['events_in'] * 1000 / 
                    pipeline_analysis['duration_in_millis']
                )
            else:
                pipeline_analysis['throughput_events_per_second'] = 0
            
            # 计算事件丢失率
            if pipeline_analysis['events_in'] > 0:
                pipeline_analysis['event_loss_rate'] = (
                    (pipeline_analysis['events_in'] - pipeline_analysis['events_out']) / 
                    pipeline_analysis['events_in'] * 100
                )
            else:
                pipeline_analysis['event_loss_rate'] = 0
            
            analysis[pipeline_id] = pipeline_analysis
        
        return analysis
    
    def detect_performance_issues(self, analysis):
        """检测性能问题"""
        issues = []
        
        for pipeline_id, pipeline_data in analysis.items():
            # 检查吞吐量
            if pipeline_data['throughput_events_per_second'] < 100:
                issues.append({
                    'severity': 'warning',
                    'pipeline': pipeline_id,
                    'issue': f"Low throughput: {pipeline_data['throughput_events_per_second']:.2f} events/sec",
                    'recommendation': "Check filter complexity and output performance"
                })
            
            # 检查事件丢失
            if pipeline_data['event_loss_rate'] > 5:
                issues.append({
                    'severity': 'critical',
                    'pipeline': pipeline_id,
                    'issue': f"High event loss rate: {pipeline_data['event_loss_rate']:.2f}%",
                    'recommendation': "Check for filter errors and output failures"
                })
            
            # 检查队列延迟
            if pipeline_data['queue_push_duration_in_millis'] > 1000:
                issues.append({
                    'severity': 'warning',
                    'pipeline': pipeline_id,
                    'issue': f"High queue push duration: {pipeline_data['queue_push_duration_in_millis']}ms",
                    'recommendation': "Consider increasing queue size or workers"
                })
        
        return issues
    
    def generate_performance_report(self, analysis):
        """生成性能报告"""
        report = "\n=== Logstash Performance Report ===\n"
        report += f"Report generated at: {datetime.now()}\n\n"
        
        for pipeline_id, pipeline_data in analysis.items():
            report += f"Pipeline: {pipeline_id}\n"
            report += f"  Events In: {pipeline_data['events_in']:,}\n"
            report += f"  Events Out: {pipeline_data['events_out']:,}\n"
            report += f"  Events Filtered: {pipeline_data['events_filtered']:,}\n"
            report += f"  Throughput: {pipeline_data['throughput_events_per_second']:.2f} events/sec\n"
            report += f"  Event Loss Rate: {pipeline_data['event_loss_rate']:.2f}%\n"
            report += f"  Queue Push Duration: {pipeline_data['queue_push_duration_in_millis']}ms\n\n"
        
        return report
    
    def monitor_performance(self, duration_minutes=30, interval_seconds=60):
        """持续监控性能"""
        print(f"Starting Logstash performance monitoring for {duration_minutes} minutes...")
        
        end_time = time.time() + (duration_minutes * 60)
        
        while time.time() < end_time:
            stats = self.get_pipeline_stats()
            if stats:
                analysis = self.analyze_pipeline_performance(stats)
                if analysis:
                    # 检测问题
                    issues = self.detect_performance_issues(analysis)
                    if issues:
                        print(f"\n[{datetime.now()}] Performance issues detected:")
                        for issue in issues:
                            print(f"  {issue['severity'].upper()}: {issue['pipeline']} - {issue['issue']}")
                    
                    # 显示当前状态
                    print(f"[{datetime.now()}] Pipeline status:")
                    for pipeline_id, pipeline_data in analysis.items():
                        print(f"  {pipeline_id}: {pipeline_data['throughput_events_per_second']:.2f} events/sec")
            
            time.sleep(interval_seconds)

# 使用示例
if __name__ == "__main__":
    monitor = LogstashMonitor()
    monitor.monitor_performance(duration_minutes=30, interval_seconds=60)

管道配置优化

性能优化配置示例:

# optimized-logstash.conf
input {
  beats {
    port => 5044
    # 优化网络缓冲区
    receive_buffer_bytes => 1048576
    # 启用压缩
    codec => "json"
  }
  
  # 使用多个输入实例提高并发
  kafka {
    bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
    topics => ["logs"]
    group_id => "logstash-group"
    consumer_threads => 4
    # 批量处理
    fetch_min_bytes => 1024
    fetch_max_wait_ms => 500
    max_poll_records => 1000
    # 启用压缩
    compression_type => "snappy"
  }
}

filter {
  # 使用条件语句减少不必要的处理
  if [fields][log_type] == "nginx" {
    grok {
      match => { "message" => "%{NGINXACCESS}" }
      # 启用性能优化
      timeout_millis => 30000
      tag_on_timeout => "_groktimeout"
    }
    
    # 使用缓存提高性能
    if [client_ip] {
      # 地理位置查询缓存
      geoip {
        source => "client_ip"
        target => "geoip"
        cache_size => 10000
      }
    }
    
    # 日期解析优化
    date {
      match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
      target => "@timestamp"
    }
  }
  
  # 应用程序日志处理
  else if [fields][log_type] == "application" {
    # JSON解析
    json {
      source => "message"
      target => "app"
      # 跳过无效JSON
      skip_on_invalid_json => true
    }
    
    # 多行日志处理
    multiline {
      pattern => "^\d{4}-\d{2}-\d{2}"
      negate => true
      what => "previous"
      max_lines => 1000
      timeout => 5
    }
  }
  
  # 通用字段处理
  mutate {
    # 移除不需要的字段
    remove_field => [ "[beat][hostname]", "[beat][version]", "prospector" ]
    # 类型转换
    convert => {
      "response_time" => "float"
      "status_code" => "integer"
    }
  }
  
  # 错误处理
  if "_grokparsefailure" in [tags] {
    mutate {
      add_field => { "parse_error" => "grok_failure" }
    }
  }
}

output {
  # 条件输出减少处理开销
  if [fields][log_type] == "nginx" {
    elasticsearch {
      hosts => ["es1:9200", "es2:9200", "es3:9200"]
      index => "nginx-logs-%{+YYYY.MM.dd}"
      
      # 批量优化
      flush_size => 1000
      idle_flush_time => 5
      
      # 连接池优化
      pool_max => 1000
      pool_max_per_route => 100
      
      # 重试配置
      retry_max_interval => 5
      retry_max_times => 3
      
      # 模板配置
      template_name => "nginx-logs"
      template_pattern => "nginx-logs-*"
      template => "/etc/logstash/templates/nginx-template.json"
      template_overwrite => true
    }
  }
  
  else if [fields][log_type] == "application" {
    elasticsearch {
      hosts => ["es1:9200", "es2:9200", "es3:9200"]
      index => "app-logs-%{+YYYY.MM.dd}"
      flush_size => 500
      idle_flush_time => 3
    }
  }
  
  # 错误日志单独处理
  if "_grokparsefailure" in [tags] or "_jsonparsefailure" in [tags] {
    elasticsearch {
      hosts => ["es1:9200", "es2:9200", "es3:9200"]
      index => "parse-errors-%{+YYYY.MM.dd}"
    }
  }
  
  # 调试输出(仅开发环境)
  if [debug] == "true" {
    stdout {
      codec => rubydebug
    }
  }
}

4. Kibana性能问题

查询性能优化

Kibana查询优化脚本:

// kibana-query-optimizer.js
class KibanaQueryOptimizer {
    constructor(kibanaUrl, username, password) {
        this.kibanaUrl = kibanaUrl;
        this.auth = btoa(`${username}:${password}`);
    }
    
    async analyzeSlowQueries() {
        try {
            // 获取慢查询日志
            const response = await fetch(`${this.kibanaUrl}/api/console/proxy?path=_cat/indices&method=GET`, {
                headers: {
                    'Authorization': `Basic ${this.auth}`,
                    'Content-Type': 'application/json',
                    'kbn-xsrf': 'true'
                }
            });
            
            const indices = await response.text();
            console.log('Indices analysis:', indices);
            
            // 分析查询性能
            return this.getQueryPerformanceMetrics();
        } catch (error) {
            console.error('Error analyzing slow queries:', error);
        }
    }
    
    async getQueryPerformanceMetrics() {
        const metricsQuery = {
            "aggs": {
                "slow_queries": {
                    "filter": {
                        "range": {
                            "took": {
                                "gte": 1000
                            }
                        }
                    },
                    "aggs": {
                        "avg_duration": {
                            "avg": {
                                "field": "took"
                            }
                        },
                        "query_types": {
                            "terms": {
                                "field": "types.keyword",
                                "size": 10
                            }
                        }
                    }
                }
            },
            "size": 0,
            "query": {
                "range": {
                    "@timestamp": {
                        "gte": "now-1h"
                    }
                }
            }
        };
        
        try {
            const response = await fetch(`${this.kibanaUrl}/api/console/proxy?path=.kibana/_search&method=POST`, {
                method: 'POST',
                headers: {
                    'Authorization': `Basic ${this.auth}`,
                    'Content-Type': 'application/json',
                    'kbn-xsrf': 'true'
                },
                body: JSON.stringify(metricsQuery)
            });
            
            return await response.json();
        } catch (error) {
            console.error('Error getting query metrics:', error);
        }
    }
    
    generateOptimizationRecommendations(metrics) {
        const recommendations = [];
        
        if (metrics && metrics.aggregations) {
            const slowQueries = metrics.aggregations.slow_queries;
            
            if (slowQueries.doc_count > 0) {
                recommendations.push({
                    type: 'performance',
                    severity: 'high',
                    message: `Found ${slowQueries.doc_count} slow queries in the last hour`,
                    suggestions: [
                        'Add time range filters to limit data scope',
                        'Use field filters instead of full-text search when possible',
                        'Consider using data views with appropriate field mappings',
                        'Optimize index patterns and field selections'
                    ]
                });
            }
            
            if (slowQueries.avg_duration && slowQueries.avg_duration.value > 5000) {
                recommendations.push({
                    type: 'performance',
                    severity: 'critical',
                    message: `Average query duration is very high: ${slowQueries.avg_duration.value}ms`,
                    suggestions: [
                        'Review and optimize Elasticsearch cluster performance',
                        'Consider increasing cluster resources',
                        'Implement query result caching',
                        'Use aggregation-based visualizations instead of raw data'
                    ]
                });
            }
        }
        
        return recommendations;
    }
}

// 使用示例
const optimizer = new KibanaQueryOptimizer(
    'http://localhost:5601',
    'elastic',
    'your_password'
);

optimizer.analyzeSlowQueries().then(metrics => {
    const recommendations = optimizer.generateOptimizationRecommendations(metrics);
    console.log('Optimization recommendations:', recommendations);
});

性能调优策略

1. Elasticsearch性能调优

索引优化

索引设置优化:

{
  "settings": {
    "index": {
      "number_of_shards": 1,
      "number_of_replicas": 1,
      "refresh_interval": "30s",
      "translog": {
        "flush_threshold_size": "1gb",
        "sync_interval": "30s",
        "durability": "async"
      },
      "merge": {
        "policy": {
          "max_merge_at_once": 10,
          "max_merged_segment": "5gb",
          "segments_per_tier": 10
        }
      },
      "codec": "best_compression",
      "routing": {
        "allocation": {
          "total_shards_per_node": 3
        }
      },
      "queries": {
        "cache": {
          "enabled": true
        }
      },
      "requests": {
        "cache": {
          "enable": true
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "@timestamp": {
        "type": "date",
        "format": "strict_date_optional_time||epoch_millis"
      },
      "message": {
        "type": "text",
        "analyzer": "standard",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "level": {
        "type": "keyword"
      },
      "host": {
        "type": "keyword"
      },
      "response_time": {
        "type": "float",
        "index": false
      },
      "user_agent": {
        "type": "text",
        "analyzer": "standard",
        "index": false
      }
    }
  }
}

查询优化

查询性能优化示例:

{
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "@timestamp": {
              "gte": "now-1h",
              "lte": "now"
            }
          }
        },
        {
          "term": {
            "level": "ERROR"
          }
        }
      ],
      "must": [
        {
          "match": {
            "message": {
              "query": "exception",
              "operator": "and"
            }
          }
        }
      ]
    }
  },
  "aggs": {
    "error_trends": {
      "date_histogram": {
        "field": "@timestamp",
        "fixed_interval": "5m",
        "min_doc_count": 1
      },
      "aggs": {
        "top_errors": {
          "terms": {
            "field": "error_type.keyword",
            "size": 5
          }
        }
      }
    }
  },
  "size": 0,
  "timeout": "30s"
}

2. 系统级优化

操作系统调优

系统优化脚本:

#!/bin/bash
# elasticsearch-system-tuning.sh

echo "Starting Elasticsearch system tuning..."

# 内存设置
echo "Configuring memory settings..."
echo 'vm.max_map_count=262144' >> /etc/sysctl.conf
echo 'vm.swappiness=1' >> /etc/sysctl.conf
echo 'vm.dirty_ratio=15' >> /etc/sysctl.conf
echo 'vm.dirty_background_ratio=5' >> /etc/sysctl.conf

# 文件描述符限制
echo "Configuring file descriptor limits..."
echo 'elasticsearch soft nofile 65536' >> /etc/security/limits.conf
echo 'elasticsearch hard nofile 65536' >> /etc/security/limits.conf
echo 'elasticsearch soft nproc 4096' >> /etc/security/limits.conf
echo 'elasticsearch hard nproc 4096' >> /etc/security/limits.conf
echo 'elasticsearch soft memlock unlimited' >> /etc/security/limits.conf
echo 'elasticsearch hard memlock unlimited' >> /etc/security/limits.conf

# 网络设置
echo "Configuring network settings..."
echo 'net.core.somaxconn=65535' >> /etc/sysctl.conf
echo 'net.core.netdev_max_backlog=5000' >> /etc/sysctl.conf
echo 'net.ipv4.tcp_max_syn_backlog=65535' >> /etc/sysctl.conf
echo 'net.ipv4.tcp_keepalive_time=600' >> /etc/sysctl.conf
echo 'net.ipv4.tcp_keepalive_intvl=60' >> /etc/sysctl.conf
echo 'net.ipv4.tcp_keepalive_probes=3' >> /etc/sysctl.conf

# 磁盘I/O优化
echo "Configuring disk I/O settings..."
echo 'deadline' > /sys/block/sda/queue/scheduler
echo '32' > /sys/block/sda/queue/nr_requests
echo '256' > /sys/block/sda/queue/read_ahead_kb

# 应用设置
sysctl -p

echo "System tuning completed. Please reboot for all changes to take effect."

# 创建监控脚本
cat > /usr/local/bin/es-system-monitor.sh << 'EOF'
#!/bin/bash
# Elasticsearch系统监控脚本

echo "=== Elasticsearch System Monitor ==="
echo "Timestamp: $(date)"
echo

# 内存使用
echo "Memory Usage:"
free -h
echo

# 磁盘使用
echo "Disk Usage:"
df -h | grep -E '(Filesystem|/dev/)'
echo

# 网络连接
echo "Network Connections:"
ss -tuln | grep :9200
echo

# 文件描述符
echo "File Descriptors:"
cat /proc/sys/fs/file-nr
echo

# 系统负载
echo "System Load:"
uptime
echo

# JVM进程信息
echo "Elasticsearch JVM:"
ps aux | grep elasticsearch | grep -v grep
EOF

chmod +x /usr/local/bin/es-system-monitor.sh

echo "System monitor script created at /usr/local/bin/es-system-monitor.sh"

磁盘I/O优化

磁盘性能测试脚本:

#!/bin/bash
# disk-performance-test.sh

DATA_DIR="/var/lib/elasticsearch"
TEST_FILE="${DATA_DIR}/disk_test"
TEST_SIZE="1G"

echo "Starting disk performance test for Elasticsearch data directory..."
echo "Test directory: $DATA_DIR"
echo "Test file size: $TEST_SIZE"
echo

# 顺序写测试
echo "=== Sequential Write Test ==="
dd if=/dev/zero of="${TEST_FILE}_write" bs=1M count=1024 oflag=direct 2>&1 | \
    grep -E '(copied|MB/s)'
echo

# 顺序读测试
echo "=== Sequential Read Test ==="
dd if="${TEST_FILE}_write" of=/dev/null bs=1M iflag=direct 2>&1 | \
    grep -E '(copied|MB/s)'
echo

# 随机写测试
echo "=== Random Write Test ==="
fio --name=random-write --ioengine=libaio --iodepth=32 --rw=randwrite \
    --bs=4k --direct=1 --size=1G --numjobs=1 --runtime=60 --group_reporting \
    --filename="${TEST_FILE}_random" 2>/dev/null | \
    grep -E '(write:|IOPS=|BW=)'
echo

# 随机读测试
echo "=== Random Read Test ==="
fio --name=random-read --ioengine=libaio --iodepth=32 --rw=randread \
    --bs=4k --direct=1 --size=1G --numjobs=1 --runtime=60 --group_reporting \
    --filename="${TEST_FILE}_random" 2>/dev/null | \
    grep -E '(read:|IOPS=|BW=)'
echo

# 混合读写测试
echo "=== Mixed Read/Write Test ==="
fio --name=mixed-rw --ioengine=libaio --iodepth=32 --rw=randrw --rwmixread=70 \
    --bs=4k --direct=1 --size=1G --numjobs=1 --runtime=60 --group_reporting \
    --filename="${TEST_FILE}_mixed" 2>/dev/null | \
    grep -E '(read:|write:|IOPS=|BW=)'
echo

# 清理测试文件
rm -f "${TEST_FILE}_"*

echo "Disk performance test completed."
echo
echo "Performance Recommendations:"
echo "- Sequential Write should be > 100 MB/s for good performance"
echo "- Sequential Read should be > 200 MB/s for good performance"
echo "- Random Write IOPS should be > 1000 for SSD, > 100 for HDD"
echo "- Random Read IOPS should be > 5000 for SSD, > 150 for HDD"

总结

本章详细介绍了ELK Stack的故障排除与性能调优,包括:

核心内容

  1. 常见故障诊断 - 集群状态、分片分配、内存问题诊断
  2. 性能监控 - JVM监控、GC分析、管道性能监控
  3. 配置优化 - 索引设置、查询优化、管道配置优化
  4. 系统级调优 - 操作系统参数、磁盘I/O优化

技术亮点

  • 全面的诊断工具和脚本
  • 自动化性能监控和告警
  • 详细的优化配置示例
  • 系统级性能调优指南

实践价值

  • 快速定位和解决生产环境问题
  • 提升系统整体性能和稳定性
  • 建立完善的监控和告警体系
  • 优化资源使用效率

下一章将学习ELK Stack的安全配置与权限管理,确保系统的安全性和合规性。