1. 性能监控与诊断

1.1 集群健康监控

# 集群健康状态
GET /_cluster/health

# 详细的集群健康信息
GET /_cluster/health?level=indices

# 节点统计信息
GET /_nodes/stats

# 索引统计信息
GET /_stats

# 热点线程分析
GET /_nodes/hot_threads

# 任务管理
GET /_tasks

# 慢查询日志
PUT /my_index/_settings
{
  "index.search.slowlog.threshold.query.warn": "10s",
  "index.search.slowlog.threshold.query.info": "5s",
  "index.search.slowlog.threshold.query.debug": "2s",
  "index.search.slowlog.threshold.query.trace": "500ms",
  "index.search.slowlog.threshold.fetch.warn": "1s",
  "index.search.slowlog.threshold.fetch.info": "800ms",
  "index.search.slowlog.threshold.fetch.debug": "500ms",
  "index.search.slowlog.threshold.fetch.trace": "200ms",
  "index.indexing.slowlog.threshold.index.warn": "10s",
  "index.indexing.slowlog.threshold.index.info": "5s",
  "index.indexing.slowlog.threshold.index.debug": "2s",
  "index.indexing.slowlog.threshold.index.trace": "500ms"
}

1.2 性能监控脚本

import requests
import json
import time
from datetime import datetime
from typing import Dict, List, Any

class ElasticsearchMonitor:
    def __init__(self, host: str = "localhost", port: int = 9200):
        self.base_url = f"http://{host}:{port}"
        self.session = requests.Session()
    
    def get_cluster_health(self) -> Dict[str, Any]:
        """获取集群健康状态"""
        response = self.session.get(f"{self.base_url}/_cluster/health")
        return response.json()
    
    def get_node_stats(self) -> Dict[str, Any]:
        """获取节点统计信息"""
        response = self.session.get(f"{self.base_url}/_nodes/stats")
        return response.json()
    
    def get_index_stats(self, index: str = None) -> Dict[str, Any]:
        """获取索引统计信息"""
        url = f"{self.base_url}/_stats"
        if index:
            url = f"{self.base_url}/{index}/_stats"
        response = self.session.get(url)
        return response.json()
    
    def analyze_performance(self) -> Dict[str, Any]:
        """性能分析"""
        cluster_health = self.get_cluster_health()
        node_stats = self.get_node_stats()
        index_stats = self.get_index_stats()
        
        analysis = {
            "timestamp": datetime.now().isoformat(),
            "cluster_status": cluster_health["status"],
            "active_shards": cluster_health["active_shards"],
            "relocating_shards": cluster_health["relocating_shards"],
            "unassigned_shards": cluster_health["unassigned_shards"],
            "nodes": {},
            "indices": {},
            "alerts": []
        }
        
        # 节点分析
        for node_id, node_data in node_stats["nodes"].items():
            node_name = node_data["name"]
            jvm = node_data["jvm"]
            os_data = node_data["os"]
            
            # JVM内存使用率
            heap_used_percent = jvm["mem"]["heap_used_percent"]
            
            # CPU使用率
            cpu_percent = os_data["cpu"]["percent"]
            
            # 磁盘使用情况
            fs_data = node_data["fs"]["total"]
            disk_used_percent = (fs_data["total_in_bytes"] - fs_data["available_in_bytes"]) / fs_data["total_in_bytes"] * 100
            
            analysis["nodes"][node_name] = {
                "heap_used_percent": heap_used_percent,
                "cpu_percent": cpu_percent,
                "disk_used_percent": disk_used_percent,
                "gc_time": jvm["gc"]["collectors"]["young"]["collection_time_in_millis"] + 
                          jvm["gc"]["collectors"]["old"]["collection_time_in_millis"]
            }
            
            # 生成告警
            if heap_used_percent > 85:
                analysis["alerts"].append(f"节点 {node_name} JVM堆内存使用率过高: {heap_used_percent}%")
            
            if cpu_percent > 80:
                analysis["alerts"].append(f"节点 {node_name} CPU使用率过高: {cpu_percent}%")
            
            if disk_used_percent > 85:
                analysis["alerts"].append(f"节点 {node_name} 磁盘使用率过高: {disk_used_percent:.1f}%")
        
        # 索引分析
        for index_name, index_data in index_stats["indices"].items():
            primaries = index_data["primaries"]
            
            analysis["indices"][index_name] = {
                "docs_count": primaries["docs"]["count"],
                "store_size_mb": primaries["store"]["size_in_bytes"] / 1024 / 1024,
                "indexing_rate": primaries["indexing"]["index_total"],
                "search_rate": primaries["search"]["query_total"],
                "avg_search_time": primaries["search"]["query_time_in_millis"] / max(primaries["search"]["query_total"], 1)
            }
        
        return analysis
    
    def continuous_monitoring(self, interval: int = 60, duration: int = 3600):
        """持续监控"""
        start_time = time.time()
        
        while time.time() - start_time < duration:
            try:
                analysis = self.analyze_performance()
                
                print(f"\n=== 监控报告 {analysis['timestamp']} ===")
                print(f"集群状态: {analysis['cluster_status']}")
                print(f"活跃分片: {analysis['active_shards']}")
                print(f"未分配分片: {analysis['unassigned_shards']}")
                
                # 节点状态
                print("\n节点状态:")
                for node_name, node_data in analysis["nodes"].items():
                    print(f"  {node_name}:")
                    print(f"    JVM堆内存: {node_data['heap_used_percent']}%")
                    print(f"    CPU: {node_data['cpu_percent']}%")
                    print(f"    磁盘: {node_data['disk_used_percent']:.1f}%")
                
                # 告警信息
                if analysis["alerts"]:
                    print("\n⚠️ 告警信息:")
                    for alert in analysis["alerts"]:
                        print(f"  - {alert}")
                
                time.sleep(interval)
                
            except Exception as e:
                print(f"监控异常: {e}")
                time.sleep(interval)

# 使用示例
if __name__ == "__main__":
    monitor = ElasticsearchMonitor()
    
    # 单次分析
    analysis = monitor.analyze_performance()
    print(json.dumps(analysis, indent=2, ensure_ascii=False))
    
    # 持续监控(运行1小时,每分钟检查一次)
    # monitor.continuous_monitoring(interval=60, duration=3600)

2. 索引性能优化

2.1 索引设计优化

# 优化的索引设置
PUT /optimized_index
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1,
    "refresh_interval": "30s",
    "index.codec": "best_compression",
    "index.merge.policy.max_merge_at_once": 5,
    "index.merge.policy.segments_per_tier": 5,
    "index.translog.flush_threshold_size": "1gb",
    "index.translog.sync_interval": "30s",
    "index.max_result_window": 50000,
    "index.mapping.total_fields.limit": 2000,
    "index.mapping.depth.limit": 20,
    "index.mapping.nested_fields.limit": 100
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "keyword",
        "store": true
      },
      "title": {
        "type": "text",
        "analyzer": "standard",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "content": {
        "type": "text",
        "analyzer": "standard",
        "index_options": "positions"
      },
      "tags": {
        "type": "keyword"
      },
      "created_at": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
      },
      "metadata": {
        "type": "object",
        "enabled": false
      }
    }
  }
}

# 批量索引优化
PUT /_cluster/settings
{
  "transient": {
    "indices.memory.index_buffer_size": "20%",
    "cluster.routing.allocation.node_concurrent_recoveries": 4,
    "cluster.routing.allocation.cluster_concurrent_rebalance": 2
  }
}

2.2 批量索引优化

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, parallel_bulk
import json
import time
from typing import List, Dict, Any, Generator

class OptimizedIndexer:
    def __init__(self, hosts: List[str] = None):
        self.es = Elasticsearch(
            hosts or ['localhost:9200'],
            # 连接池优化
            maxsize=25,
            max_retries=3,
            retry_on_timeout=True,
            # 超时设置
            timeout=30,
            max_timeout=120
        )
    
    def prepare_for_bulk_indexing(self, index_name: str):
        """为批量索引做准备"""
        # 临时禁用刷新
        self.es.indices.put_settings(
            index=index_name,
            body={
                "refresh_interval": "-1",
                "number_of_replicas": 0
            }
        )
    
    def restore_after_bulk_indexing(self, index_name: str):
        """批量索引后恢复设置"""
        # 恢复正常设置
        self.es.indices.put_settings(
            index=index_name,
            body={
                "refresh_interval": "30s",
                "number_of_replicas": 1
            }
        )
        
        # 强制刷新
        self.es.indices.refresh(index=index_name)
        
        # 强制合并段
        self.es.indices.forcemerge(
            index=index_name,
            max_num_segments=1,
            wait_for_completion=False
        )
    
    def generate_docs(self, data_source: List[Dict], index_name: str) -> Generator:
        """生成文档"""
        for doc in data_source:
            yield {
                "_index": index_name,
                "_source": doc
            }
    
    def bulk_index_optimized(self, data_source: List[Dict], index_name: str) -> Dict[str, Any]:
        """优化的批量索引"""
        start_time = time.time()
        
        try:
            # 准备批量索引
            self.prepare_for_bulk_indexing(index_name)
            
            # 执行批量索引
            success_count = 0
            error_count = 0
            
            # 使用parallel_bulk进行并行处理
            for success, info in parallel_bulk(
                self.es,
                self.generate_docs(data_source, index_name),
                chunk_size=1000,
                thread_count=4,
                max_chunk_bytes=10 * 1024 * 1024,  # 10MB
                request_timeout=60
            ):
                if success:
                    success_count += 1
                else:
                    error_count += 1
                    print(f"索引错误: {info}")
            
            # 恢复设置
            self.restore_after_bulk_indexing(index_name)
            
            elapsed_time = time.time() - start_time
            
            return {
                "success": True,
                "indexed_count": success_count,
                "error_count": error_count,
                "elapsed_time": elapsed_time,
                "docs_per_second": len(data_source) / elapsed_time
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "elapsed_time": time.time() - start_time
            }
    
    def streaming_bulk_index(self, data_generator: Generator, index_name: str) -> Dict[str, Any]:
        """流式批量索引"""
        start_time = time.time()
        success_count = 0
        error_count = 0
        
        try:
            self.prepare_for_bulk_indexing(index_name)
            
            def doc_generator():
                for doc in data_generator:
                    yield {
                        "_index": index_name,
                        "_source": doc
                    }
            
            # 流式处理
            for success, info in bulk(
                self.es,
                doc_generator(),
                chunk_size=500,
                max_chunk_bytes=5 * 1024 * 1024,
                request_timeout=60,
                max_retries=3,
                initial_backoff=2,
                max_backoff=600
            ):
                if success:
                    success_count += 1
                else:
                    error_count += 1
                    print(f"索引错误: {info}")
            
            self.restore_after_bulk_indexing(index_name)
            
            elapsed_time = time.time() - start_time
            
            return {
                "success": True,
                "indexed_count": success_count,
                "error_count": error_count,
                "elapsed_time": elapsed_time
            }
        
        except Exception as e:
            return {
                "success": False,
                "error": str(e),
                "elapsed_time": time.time() - start_time
            }

# 使用示例
if __name__ == "__main__":
    indexer = OptimizedIndexer()
    
    # 模拟数据
    sample_data = [
        {
            "id": f"doc_{i}",
            "title": f"Document {i}",
            "content": f"This is the content of document {i}" * 10,
            "tags": ["tag1", "tag2"],
            "created_at": "2024-01-01 12:00:00"
        }
        for i in range(10000)
    ]
    
    # 批量索引
    result = indexer.bulk_index_optimized(sample_data, "test_index")
    print(f"批量索引结果: {result}")

3. 查询性能优化

3.1 查询优化技巧

# 使用过滤器而不是查询
GET /products/_search
{
  "query": {
    "bool": {
      "filter": [
        {"term": {"category.keyword": "electronics"}},
        {"range": {"price": {"gte": 100, "lte": 1000}}}
      ]
    }
  }
}

# 使用constant_score避免评分计算
GET /products/_search
{
  "query": {
    "constant_score": {
      "filter": {
        "bool": {
          "must": [
            {"term": {"status.keyword": "active"}},
            {"range": {"created_at": {"gte": "2024-01-01"}}}
          ]
        }
      },
      "boost": 1.0
    }
  }
}

# 限制返回字段
GET /products/_search
{
  "_source": ["id", "title", "price"],
  "query": {
    "match": {"title": "laptop"}
  }
}

# 使用search_after进行深度分页
GET /products/_search
{
  "size": 100,
  "sort": [
    {"created_at": "desc"},
    {"_id": "desc"}
  ],
  "search_after": ["2024-01-01T12:00:00", "doc_123"]
}

# 使用scroll进行大量数据遍历
GET /products/_search?scroll=5m
{
  "size": 1000,
  "query": {
    "match_all": {}
  }
}

# 继续scroll
GET /_search/scroll
{
  "scroll": "5m",
  "scroll_id": "scroll_id_here"
}

3.2 查询缓存优化

# 启用查询缓存
PUT /my_index/_settings
{
  "index.queries.cache.enabled": true
}

# 使用缓存的查询
GET /my_index/_search?request_cache=true
{
  "size": 0,
  "aggs": {
    "categories": {
      "terms": {
        "field": "category.keyword"
      }
    }
  }
}

# 清除查询缓存
POST /my_index/_cache/clear?query=true

4. 内存和JVM优化

4.1 JVM配置优化

# jvm.options 配置示例

# 堆内存设置(不超过32GB,建议为系统内存的50%)
-Xms16g
-Xmx16g

# 垃圾收集器设置
-XX:+UseG1GC
-XX:G1HeapRegionSize=16m
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:+UnlockExperimentalVMOptions
-XX:+UseG1GC
-XX:G1NewSizePercent=30
-XX:G1MaxNewSizePercent=40

# GC日志
-Xlog:gc*,gc+age=trace,safepoint:gc.log:time,level,tags
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=32
-XX:GCLogFileSize=64m

# 其他优化
-Djava.io.tmpdir=/tmp
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/lib/elasticsearch
-XX:ErrorFile=/var/log/elasticsearch/hs_err_pid%p.log

4.2 内存使用监控

import psutil
import requests
from typing import Dict, Any

class MemoryMonitor:
    def __init__(self, es_host: str = "localhost", es_port: int = 9200):
        self.es_url = f"http://{es_host}:{es_port}"
    
    def get_system_memory(self) -> Dict[str, Any]:
        """获取系统内存信息"""
        memory = psutil.virtual_memory()
        return {
            "total_gb": memory.total / (1024**3),
            "available_gb": memory.available / (1024**3),
            "used_gb": memory.used / (1024**3),
            "percent": memory.percent
        }
    
    def get_es_memory_usage(self) -> Dict[str, Any]:
        """获取Elasticsearch内存使用情况"""
        try:
            response = requests.get(f"{self.es_url}/_nodes/stats/jvm")
            data = response.json()
            
            memory_info = {}
            for node_id, node_data in data["nodes"].items():
                node_name = node_data["name"]
                jvm = node_data["jvm"]
                
                memory_info[node_name] = {
                    "heap_used_gb": jvm["mem"]["heap_used_in_bytes"] / (1024**3),
                    "heap_max_gb": jvm["mem"]["heap_max_in_bytes"] / (1024**3),
                    "heap_used_percent": jvm["mem"]["heap_used_percent"],
                    "non_heap_used_gb": jvm["mem"]["non_heap_used_in_bytes"] / (1024**3),
                    "gc_young_count": jvm["gc"]["collectors"]["young"]["collection_count"],
                    "gc_young_time_ms": jvm["gc"]["collectors"]["young"]["collection_time_in_millis"],
                    "gc_old_count": jvm["gc"]["collectors"]["old"]["collection_count"],
                    "gc_old_time_ms": jvm["gc"]["collectors"]["old"]["collection_time_in_millis"]
                }
            
            return memory_info
        
        except Exception as e:
            return {"error": str(e)}
    
    def analyze_memory_usage(self) -> Dict[str, Any]:
        """分析内存使用情况"""
        system_memory = self.get_system_memory()
        es_memory = self.get_es_memory_usage()
        
        analysis = {
            "system_memory": system_memory,
            "elasticsearch_memory": es_memory,
            "recommendations": []
        }
        
        # 生成建议
        if system_memory["percent"] > 85:
            analysis["recommendations"].append("系统内存使用率过高,考虑增加内存或优化查询")
        
        for node_name, node_memory in es_memory.items():
            if isinstance(node_memory, dict):
                if node_memory["heap_used_percent"] > 85:
                    analysis["recommendations"].append(f"节点 {node_name} JVM堆内存使用率过高")
                
                # 检查GC频率
                total_gc_time = node_memory["gc_young_time_ms"] + node_memory["gc_old_time_ms"]
                if total_gc_time > 10000:  # 10秒
                    analysis["recommendations"].append(f"节点 {node_name} GC时间过长,考虑调整JVM参数")
        
        return analysis

# 使用示例
if __name__ == "__main__":
    monitor = MemoryMonitor()
    analysis = monitor.analyze_memory_usage()
    
    print("内存使用分析:")
    print(f"系统内存: {analysis['system_memory']['used_gb']:.2f}GB / {analysis['system_memory']['total_gb']:.2f}GB ({analysis['system_memory']['percent']:.1f}%)")
    
    print("\nElasticsearch内存使用:")
    for node_name, node_memory in analysis["elasticsearch_memory"].items():
        if isinstance(node_memory, dict):
            print(f"  {node_name}:")
            print(f"    堆内存: {node_memory['heap_used_gb']:.2f}GB / {node_memory['heap_max_gb']:.2f}GB ({node_memory['heap_used_percent']}%)")
            print(f"    非堆内存: {node_memory['non_heap_used_gb']:.2f}GB")
            print(f"    GC统计: Young({node_memory['gc_young_count']}次, {node_memory['gc_young_time_ms']}ms), Old({node_memory['gc_old_count']}次, {node_memory['gc_old_time_ms']}ms)")
    
    if analysis["recommendations"]:
        print("\n优化建议:")
        for rec in analysis["recommendations"]:
            print(f"  - {rec}")

5. 磁盘和存储优化

5.1 存储配置优化

# 文件系统优化
# 在 /etc/fstab 中添加
/dev/sdb1 /var/lib/elasticsearch ext4 defaults,noatime,nodiratime 0 2

# 磁盘调度器优化
echo noop > /sys/block/sdb/queue/scheduler

# 交换分区优化
echo 1 > /proc/sys/vm/swappiness

# 文件描述符限制
# 在 /etc/security/limits.conf 中添加
elasticsearch soft nofile 65536
elasticsearch hard nofile 65536
elasticsearch soft nproc 4096
elasticsearch hard nproc 4096

# 虚拟内存设置
echo 262144 > /proc/sys/vm/max_map_count

5.2 索引存储优化

# 使用压缩编解码器
PUT /compressed_index
{
  "settings": {
    "index.codec": "best_compression",
    "number_of_shards": 1,
    "number_of_replicas": 0
  }
}

# 冷热数据分离
PUT /_cluster/settings
{
  "persistent": {
    "cluster.routing.allocation.awareness.attributes": "temperature"
  }
}

# 热节点配置
node.attr.temperature: hot

# 冷节点配置
node.attr.temperature: cold

# 索引分配到热节点
PUT /hot_index/_settings
{
  "index.routing.allocation.require.temperature": "hot"
}

# 索引迁移到冷节点
PUT /old_index/_settings
{
  "index.routing.allocation.require.temperature": "cold"
}

6. 网络和连接优化

6.1 网络配置优化

# elasticsearch.yml 网络配置
network.host: 0.0.0.0
http.port: 9200
transport.port: 9300

# HTTP配置
http.max_content_length: 100mb
http.max_header_size: 8kb
http.max_initial_line_length: 4kb
http.compression: true
http.compression_level: 6

# 传输层配置
transport.tcp.compress: true
transport.tcp.port: 9300-9400

# 连接池配置
thread_pool.write.size: 8
thread_pool.write.queue_size: 200
thread_pool.search.size: 13
thread_pool.search.queue_size: 1000

6.2 客户端连接优化

from elasticsearch import Elasticsearch
from urllib3.util.retry import Retry
from urllib3.poolmanager import PoolManager

class OptimizedESClient:
    def __init__(self, hosts: list):
        # 连接池配置
        self.es = Elasticsearch(
            hosts,
            # 连接池设置
            maxsize=25,  # 连接池大小
            max_retries=3,  # 最大重试次数
            retry_on_timeout=True,  # 超时重试
            retry_on_status=[502, 503, 504],  # 状态码重试
            
            # 超时设置
            timeout=30,  # 默认超时
            max_timeout=120,  # 最大超时
            
            # HTTP配置
            http_compress=True,  # 启用压缩
            verify_certs=False,  # 开发环境可关闭证书验证
            
            # 连接保持
            connection_class='urllib3',
            
            # 自定义重试策略
            retry=Retry(
                total=3,
                backoff_factor=0.3,
                status_forcelist=[500, 502, 503, 504]
            )
        )
    
    def bulk_index_with_retry(self, docs: list, index: str):
        """带重试的批量索引"""
        from elasticsearch.helpers import bulk
        
        def generate_docs():
            for doc in docs:
                yield {
                    '_index': index,
                    '_source': doc
                }
        
        try:
            success, failed = bulk(
                self.es,
                generate_docs(),
                chunk_size=1000,
                max_chunk_bytes=10*1024*1024,  # 10MB
                request_timeout=60,
                max_retries=3,
                initial_backoff=2,
                max_backoff=600
            )
            return {'success': success, 'failed': failed}
        
        except Exception as e:
            return {'error': str(e)}
    
    def search_with_fallback(self, index: str, query: dict, fallback_query: dict = None):
        """带降级的搜索"""
        try:
            # 尝试主查询
            response = self.es.search(
                index=index,
                body=query,
                timeout='30s'
            )
            return response
        
        except Exception as e:
            if fallback_query:
                try:
                    # 使用降级查询
                    response = self.es.search(
                        index=index,
                        body=fallback_query,
                        timeout='10s'
                    )
                    response['_fallback'] = True
                    return response
                except Exception as fallback_error:
                    return {'error': f'Primary: {e}, Fallback: {fallback_error}'}
            else:
                return {'error': str(e)}

# 使用示例
if __name__ == "__main__":
    client = OptimizedESClient(['localhost:9200'])
    
    # 复杂查询
    complex_query = {
        "query": {
            "bool": {
                "must": [
                    {"match": {"title": "elasticsearch"}},
                    {"range": {"date": {"gte": "2024-01-01"}}}
                ]
            }
        },
        "aggs": {
            "categories": {
                "terms": {"field": "category.keyword"}
            }
        }
    }
    
    # 简单降级查询
    fallback_query = {
        "query": {
            "match": {"title": "elasticsearch"}
        }
    }
    
    result = client.search_with_fallback(
        "my_index", 
        complex_query, 
        fallback_query
    )
    
    if '_fallback' in result:
        print("使用了降级查询")
    
    print(f"搜索结果: {result.get('hits', {}).get('total', 0)} 条")

本章总结

本章详细介绍了Elasticsearch的性能优化和调优:

  1. 性能监控:学习了集群健康监控、性能指标分析和持续监控方法
  2. 索引优化:掌握了索引设计优化、批量索引优化和存储优化技巧
  3. 查询优化:了解了查询性能优化、缓存使用和分页优化方法
  4. 内存优化:学习了JVM配置优化、内存监控和垃圾收集调优
  5. 存储优化:掌握了磁盘配置、文件系统优化和冷热数据分离
  6. 网络优化:了解了网络配置优化和客户端连接优化技巧

下一章我们将学习Elasticsearch的集群管理和运维。

练习题

  1. 设计一个性能监控系统,实时监控集群的关键指标
  2. 优化一个大数据量索引的批量导入性能
  3. 分析并优化一个慢查询的性能问题
  4. 配置一个生产环境的JVM参数和系统参数