1. 性能监控与诊断
1.1 集群健康监控
# 集群健康状态
GET /_cluster/health
# 详细的集群健康信息
GET /_cluster/health?level=indices
# 节点统计信息
GET /_nodes/stats
# 索引统计信息
GET /_stats
# 热点线程分析
GET /_nodes/hot_threads
# 任务管理
GET /_tasks
# 慢查询日志
PUT /my_index/_settings
{
"index.search.slowlog.threshold.query.warn": "10s",
"index.search.slowlog.threshold.query.info": "5s",
"index.search.slowlog.threshold.query.debug": "2s",
"index.search.slowlog.threshold.query.trace": "500ms",
"index.search.slowlog.threshold.fetch.warn": "1s",
"index.search.slowlog.threshold.fetch.info": "800ms",
"index.search.slowlog.threshold.fetch.debug": "500ms",
"index.search.slowlog.threshold.fetch.trace": "200ms",
"index.indexing.slowlog.threshold.index.warn": "10s",
"index.indexing.slowlog.threshold.index.info": "5s",
"index.indexing.slowlog.threshold.index.debug": "2s",
"index.indexing.slowlog.threshold.index.trace": "500ms"
}
1.2 性能监控脚本
import requests
import json
import time
from datetime import datetime
from typing import Dict, List, Any
class ElasticsearchMonitor:
def __init__(self, host: str = "localhost", port: int = 9200):
self.base_url = f"http://{host}:{port}"
self.session = requests.Session()
def get_cluster_health(self) -> Dict[str, Any]:
"""获取集群健康状态"""
response = self.session.get(f"{self.base_url}/_cluster/health")
return response.json()
def get_node_stats(self) -> Dict[str, Any]:
"""获取节点统计信息"""
response = self.session.get(f"{self.base_url}/_nodes/stats")
return response.json()
def get_index_stats(self, index: str = None) -> Dict[str, Any]:
"""获取索引统计信息"""
url = f"{self.base_url}/_stats"
if index:
url = f"{self.base_url}/{index}/_stats"
response = self.session.get(url)
return response.json()
def analyze_performance(self) -> Dict[str, Any]:
"""性能分析"""
cluster_health = self.get_cluster_health()
node_stats = self.get_node_stats()
index_stats = self.get_index_stats()
analysis = {
"timestamp": datetime.now().isoformat(),
"cluster_status": cluster_health["status"],
"active_shards": cluster_health["active_shards"],
"relocating_shards": cluster_health["relocating_shards"],
"unassigned_shards": cluster_health["unassigned_shards"],
"nodes": {},
"indices": {},
"alerts": []
}
# 节点分析
for node_id, node_data in node_stats["nodes"].items():
node_name = node_data["name"]
jvm = node_data["jvm"]
os_data = node_data["os"]
# JVM内存使用率
heap_used_percent = jvm["mem"]["heap_used_percent"]
# CPU使用率
cpu_percent = os_data["cpu"]["percent"]
# 磁盘使用情况
fs_data = node_data["fs"]["total"]
disk_used_percent = (fs_data["total_in_bytes"] - fs_data["available_in_bytes"]) / fs_data["total_in_bytes"] * 100
analysis["nodes"][node_name] = {
"heap_used_percent": heap_used_percent,
"cpu_percent": cpu_percent,
"disk_used_percent": disk_used_percent,
"gc_time": jvm["gc"]["collectors"]["young"]["collection_time_in_millis"] +
jvm["gc"]["collectors"]["old"]["collection_time_in_millis"]
}
# 生成告警
if heap_used_percent > 85:
analysis["alerts"].append(f"节点 {node_name} JVM堆内存使用率过高: {heap_used_percent}%")
if cpu_percent > 80:
analysis["alerts"].append(f"节点 {node_name} CPU使用率过高: {cpu_percent}%")
if disk_used_percent > 85:
analysis["alerts"].append(f"节点 {node_name} 磁盘使用率过高: {disk_used_percent:.1f}%")
# 索引分析
for index_name, index_data in index_stats["indices"].items():
primaries = index_data["primaries"]
analysis["indices"][index_name] = {
"docs_count": primaries["docs"]["count"],
"store_size_mb": primaries["store"]["size_in_bytes"] / 1024 / 1024,
"indexing_rate": primaries["indexing"]["index_total"],
"search_rate": primaries["search"]["query_total"],
"avg_search_time": primaries["search"]["query_time_in_millis"] / max(primaries["search"]["query_total"], 1)
}
return analysis
def continuous_monitoring(self, interval: int = 60, duration: int = 3600):
"""持续监控"""
start_time = time.time()
while time.time() - start_time < duration:
try:
analysis = self.analyze_performance()
print(f"\n=== 监控报告 {analysis['timestamp']} ===")
print(f"集群状态: {analysis['cluster_status']}")
print(f"活跃分片: {analysis['active_shards']}")
print(f"未分配分片: {analysis['unassigned_shards']}")
# 节点状态
print("\n节点状态:")
for node_name, node_data in analysis["nodes"].items():
print(f" {node_name}:")
print(f" JVM堆内存: {node_data['heap_used_percent']}%")
print(f" CPU: {node_data['cpu_percent']}%")
print(f" 磁盘: {node_data['disk_used_percent']:.1f}%")
# 告警信息
if analysis["alerts"]:
print("\n⚠️ 告警信息:")
for alert in analysis["alerts"]:
print(f" - {alert}")
time.sleep(interval)
except Exception as e:
print(f"监控异常: {e}")
time.sleep(interval)
# 使用示例
if __name__ == "__main__":
monitor = ElasticsearchMonitor()
# 单次分析
analysis = monitor.analyze_performance()
print(json.dumps(analysis, indent=2, ensure_ascii=False))
# 持续监控(运行1小时,每分钟检查一次)
# monitor.continuous_monitoring(interval=60, duration=3600)
2. 索引性能优化
2.1 索引设计优化
# 优化的索引设置
PUT /optimized_index
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"refresh_interval": "30s",
"index.codec": "best_compression",
"index.merge.policy.max_merge_at_once": 5,
"index.merge.policy.segments_per_tier": 5,
"index.translog.flush_threshold_size": "1gb",
"index.translog.sync_interval": "30s",
"index.max_result_window": 50000,
"index.mapping.total_fields.limit": 2000,
"index.mapping.depth.limit": 20,
"index.mapping.nested_fields.limit": 100
},
"mappings": {
"properties": {
"id": {
"type": "keyword",
"store": true
},
"title": {
"type": "text",
"analyzer": "standard",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"content": {
"type": "text",
"analyzer": "standard",
"index_options": "positions"
},
"tags": {
"type": "keyword"
},
"created_at": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"metadata": {
"type": "object",
"enabled": false
}
}
}
}
# 批量索引优化
PUT /_cluster/settings
{
"transient": {
"indices.memory.index_buffer_size": "20%",
"cluster.routing.allocation.node_concurrent_recoveries": 4,
"cluster.routing.allocation.cluster_concurrent_rebalance": 2
}
}
2.2 批量索引优化
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, parallel_bulk
import json
import time
from typing import List, Dict, Any, Generator
class OptimizedIndexer:
def __init__(self, hosts: List[str] = None):
self.es = Elasticsearch(
hosts or ['localhost:9200'],
# 连接池优化
maxsize=25,
max_retries=3,
retry_on_timeout=True,
# 超时设置
timeout=30,
max_timeout=120
)
def prepare_for_bulk_indexing(self, index_name: str):
"""为批量索引做准备"""
# 临时禁用刷新
self.es.indices.put_settings(
index=index_name,
body={
"refresh_interval": "-1",
"number_of_replicas": 0
}
)
def restore_after_bulk_indexing(self, index_name: str):
"""批量索引后恢复设置"""
# 恢复正常设置
self.es.indices.put_settings(
index=index_name,
body={
"refresh_interval": "30s",
"number_of_replicas": 1
}
)
# 强制刷新
self.es.indices.refresh(index=index_name)
# 强制合并段
self.es.indices.forcemerge(
index=index_name,
max_num_segments=1,
wait_for_completion=False
)
def generate_docs(self, data_source: List[Dict], index_name: str) -> Generator:
"""生成文档"""
for doc in data_source:
yield {
"_index": index_name,
"_source": doc
}
def bulk_index_optimized(self, data_source: List[Dict], index_name: str) -> Dict[str, Any]:
"""优化的批量索引"""
start_time = time.time()
try:
# 准备批量索引
self.prepare_for_bulk_indexing(index_name)
# 执行批量索引
success_count = 0
error_count = 0
# 使用parallel_bulk进行并行处理
for success, info in parallel_bulk(
self.es,
self.generate_docs(data_source, index_name),
chunk_size=1000,
thread_count=4,
max_chunk_bytes=10 * 1024 * 1024, # 10MB
request_timeout=60
):
if success:
success_count += 1
else:
error_count += 1
print(f"索引错误: {info}")
# 恢复设置
self.restore_after_bulk_indexing(index_name)
elapsed_time = time.time() - start_time
return {
"success": True,
"indexed_count": success_count,
"error_count": error_count,
"elapsed_time": elapsed_time,
"docs_per_second": len(data_source) / elapsed_time
}
except Exception as e:
return {
"success": False,
"error": str(e),
"elapsed_time": time.time() - start_time
}
def streaming_bulk_index(self, data_generator: Generator, index_name: str) -> Dict[str, Any]:
"""流式批量索引"""
start_time = time.time()
success_count = 0
error_count = 0
try:
self.prepare_for_bulk_indexing(index_name)
def doc_generator():
for doc in data_generator:
yield {
"_index": index_name,
"_source": doc
}
# 流式处理
for success, info in bulk(
self.es,
doc_generator(),
chunk_size=500,
max_chunk_bytes=5 * 1024 * 1024,
request_timeout=60,
max_retries=3,
initial_backoff=2,
max_backoff=600
):
if success:
success_count += 1
else:
error_count += 1
print(f"索引错误: {info}")
self.restore_after_bulk_indexing(index_name)
elapsed_time = time.time() - start_time
return {
"success": True,
"indexed_count": success_count,
"error_count": error_count,
"elapsed_time": elapsed_time
}
except Exception as e:
return {
"success": False,
"error": str(e),
"elapsed_time": time.time() - start_time
}
# 使用示例
if __name__ == "__main__":
indexer = OptimizedIndexer()
# 模拟数据
sample_data = [
{
"id": f"doc_{i}",
"title": f"Document {i}",
"content": f"This is the content of document {i}" * 10,
"tags": ["tag1", "tag2"],
"created_at": "2024-01-01 12:00:00"
}
for i in range(10000)
]
# 批量索引
result = indexer.bulk_index_optimized(sample_data, "test_index")
print(f"批量索引结果: {result}")
3. 查询性能优化
3.1 查询优化技巧
# 使用过滤器而不是查询
GET /products/_search
{
"query": {
"bool": {
"filter": [
{"term": {"category.keyword": "electronics"}},
{"range": {"price": {"gte": 100, "lte": 1000}}}
]
}
}
}
# 使用constant_score避免评分计算
GET /products/_search
{
"query": {
"constant_score": {
"filter": {
"bool": {
"must": [
{"term": {"status.keyword": "active"}},
{"range": {"created_at": {"gte": "2024-01-01"}}}
]
}
},
"boost": 1.0
}
}
}
# 限制返回字段
GET /products/_search
{
"_source": ["id", "title", "price"],
"query": {
"match": {"title": "laptop"}
}
}
# 使用search_after进行深度分页
GET /products/_search
{
"size": 100,
"sort": [
{"created_at": "desc"},
{"_id": "desc"}
],
"search_after": ["2024-01-01T12:00:00", "doc_123"]
}
# 使用scroll进行大量数据遍历
GET /products/_search?scroll=5m
{
"size": 1000,
"query": {
"match_all": {}
}
}
# 继续scroll
GET /_search/scroll
{
"scroll": "5m",
"scroll_id": "scroll_id_here"
}
3.2 查询缓存优化
# 启用查询缓存
PUT /my_index/_settings
{
"index.queries.cache.enabled": true
}
# 使用缓存的查询
GET /my_index/_search?request_cache=true
{
"size": 0,
"aggs": {
"categories": {
"terms": {
"field": "category.keyword"
}
}
}
}
# 清除查询缓存
POST /my_index/_cache/clear?query=true
4. 内存和JVM优化
4.1 JVM配置优化
# jvm.options 配置示例
# 堆内存设置(不超过32GB,建议为系统内存的50%)
-Xms16g
-Xmx16g
# 垃圾收集器设置
-XX:+UseG1GC
-XX:G1HeapRegionSize=16m
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:+UnlockExperimentalVMOptions
-XX:+UseG1GC
-XX:G1NewSizePercent=30
-XX:G1MaxNewSizePercent=40
# GC日志
-Xlog:gc*,gc+age=trace,safepoint:gc.log:time,level,tags
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=32
-XX:GCLogFileSize=64m
# 其他优化
-Djava.io.tmpdir=/tmp
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/lib/elasticsearch
-XX:ErrorFile=/var/log/elasticsearch/hs_err_pid%p.log
4.2 内存使用监控
import psutil
import requests
from typing import Dict, Any
class MemoryMonitor:
def __init__(self, es_host: str = "localhost", es_port: int = 9200):
self.es_url = f"http://{es_host}:{es_port}"
def get_system_memory(self) -> Dict[str, Any]:
"""获取系统内存信息"""
memory = psutil.virtual_memory()
return {
"total_gb": memory.total / (1024**3),
"available_gb": memory.available / (1024**3),
"used_gb": memory.used / (1024**3),
"percent": memory.percent
}
def get_es_memory_usage(self) -> Dict[str, Any]:
"""获取Elasticsearch内存使用情况"""
try:
response = requests.get(f"{self.es_url}/_nodes/stats/jvm")
data = response.json()
memory_info = {}
for node_id, node_data in data["nodes"].items():
node_name = node_data["name"]
jvm = node_data["jvm"]
memory_info[node_name] = {
"heap_used_gb": jvm["mem"]["heap_used_in_bytes"] / (1024**3),
"heap_max_gb": jvm["mem"]["heap_max_in_bytes"] / (1024**3),
"heap_used_percent": jvm["mem"]["heap_used_percent"],
"non_heap_used_gb": jvm["mem"]["non_heap_used_in_bytes"] / (1024**3),
"gc_young_count": jvm["gc"]["collectors"]["young"]["collection_count"],
"gc_young_time_ms": jvm["gc"]["collectors"]["young"]["collection_time_in_millis"],
"gc_old_count": jvm["gc"]["collectors"]["old"]["collection_count"],
"gc_old_time_ms": jvm["gc"]["collectors"]["old"]["collection_time_in_millis"]
}
return memory_info
except Exception as e:
return {"error": str(e)}
def analyze_memory_usage(self) -> Dict[str, Any]:
"""分析内存使用情况"""
system_memory = self.get_system_memory()
es_memory = self.get_es_memory_usage()
analysis = {
"system_memory": system_memory,
"elasticsearch_memory": es_memory,
"recommendations": []
}
# 生成建议
if system_memory["percent"] > 85:
analysis["recommendations"].append("系统内存使用率过高,考虑增加内存或优化查询")
for node_name, node_memory in es_memory.items():
if isinstance(node_memory, dict):
if node_memory["heap_used_percent"] > 85:
analysis["recommendations"].append(f"节点 {node_name} JVM堆内存使用率过高")
# 检查GC频率
total_gc_time = node_memory["gc_young_time_ms"] + node_memory["gc_old_time_ms"]
if total_gc_time > 10000: # 10秒
analysis["recommendations"].append(f"节点 {node_name} GC时间过长,考虑调整JVM参数")
return analysis
# 使用示例
if __name__ == "__main__":
monitor = MemoryMonitor()
analysis = monitor.analyze_memory_usage()
print("内存使用分析:")
print(f"系统内存: {analysis['system_memory']['used_gb']:.2f}GB / {analysis['system_memory']['total_gb']:.2f}GB ({analysis['system_memory']['percent']:.1f}%)")
print("\nElasticsearch内存使用:")
for node_name, node_memory in analysis["elasticsearch_memory"].items():
if isinstance(node_memory, dict):
print(f" {node_name}:")
print(f" 堆内存: {node_memory['heap_used_gb']:.2f}GB / {node_memory['heap_max_gb']:.2f}GB ({node_memory['heap_used_percent']}%)")
print(f" 非堆内存: {node_memory['non_heap_used_gb']:.2f}GB")
print(f" GC统计: Young({node_memory['gc_young_count']}次, {node_memory['gc_young_time_ms']}ms), Old({node_memory['gc_old_count']}次, {node_memory['gc_old_time_ms']}ms)")
if analysis["recommendations"]:
print("\n优化建议:")
for rec in analysis["recommendations"]:
print(f" - {rec}")
5. 磁盘和存储优化
5.1 存储配置优化
# 文件系统优化
# 在 /etc/fstab 中添加
/dev/sdb1 /var/lib/elasticsearch ext4 defaults,noatime,nodiratime 0 2
# 磁盘调度器优化
echo noop > /sys/block/sdb/queue/scheduler
# 交换分区优化
echo 1 > /proc/sys/vm/swappiness
# 文件描述符限制
# 在 /etc/security/limits.conf 中添加
elasticsearch soft nofile 65536
elasticsearch hard nofile 65536
elasticsearch soft nproc 4096
elasticsearch hard nproc 4096
# 虚拟内存设置
echo 262144 > /proc/sys/vm/max_map_count
5.2 索引存储优化
# 使用压缩编解码器
PUT /compressed_index
{
"settings": {
"index.codec": "best_compression",
"number_of_shards": 1,
"number_of_replicas": 0
}
}
# 冷热数据分离
PUT /_cluster/settings
{
"persistent": {
"cluster.routing.allocation.awareness.attributes": "temperature"
}
}
# 热节点配置
node.attr.temperature: hot
# 冷节点配置
node.attr.temperature: cold
# 索引分配到热节点
PUT /hot_index/_settings
{
"index.routing.allocation.require.temperature": "hot"
}
# 索引迁移到冷节点
PUT /old_index/_settings
{
"index.routing.allocation.require.temperature": "cold"
}
6. 网络和连接优化
6.1 网络配置优化
# elasticsearch.yml 网络配置
network.host: 0.0.0.0
http.port: 9200
transport.port: 9300
# HTTP配置
http.max_content_length: 100mb
http.max_header_size: 8kb
http.max_initial_line_length: 4kb
http.compression: true
http.compression_level: 6
# 传输层配置
transport.tcp.compress: true
transport.tcp.port: 9300-9400
# 连接池配置
thread_pool.write.size: 8
thread_pool.write.queue_size: 200
thread_pool.search.size: 13
thread_pool.search.queue_size: 1000
6.2 客户端连接优化
from elasticsearch import Elasticsearch
from urllib3.util.retry import Retry
from urllib3.poolmanager import PoolManager
class OptimizedESClient:
def __init__(self, hosts: list):
# 连接池配置
self.es = Elasticsearch(
hosts,
# 连接池设置
maxsize=25, # 连接池大小
max_retries=3, # 最大重试次数
retry_on_timeout=True, # 超时重试
retry_on_status=[502, 503, 504], # 状态码重试
# 超时设置
timeout=30, # 默认超时
max_timeout=120, # 最大超时
# HTTP配置
http_compress=True, # 启用压缩
verify_certs=False, # 开发环境可关闭证书验证
# 连接保持
connection_class='urllib3',
# 自定义重试策略
retry=Retry(
total=3,
backoff_factor=0.3,
status_forcelist=[500, 502, 503, 504]
)
)
def bulk_index_with_retry(self, docs: list, index: str):
"""带重试的批量索引"""
from elasticsearch.helpers import bulk
def generate_docs():
for doc in docs:
yield {
'_index': index,
'_source': doc
}
try:
success, failed = bulk(
self.es,
generate_docs(),
chunk_size=1000,
max_chunk_bytes=10*1024*1024, # 10MB
request_timeout=60,
max_retries=3,
initial_backoff=2,
max_backoff=600
)
return {'success': success, 'failed': failed}
except Exception as e:
return {'error': str(e)}
def search_with_fallback(self, index: str, query: dict, fallback_query: dict = None):
"""带降级的搜索"""
try:
# 尝试主查询
response = self.es.search(
index=index,
body=query,
timeout='30s'
)
return response
except Exception as e:
if fallback_query:
try:
# 使用降级查询
response = self.es.search(
index=index,
body=fallback_query,
timeout='10s'
)
response['_fallback'] = True
return response
except Exception as fallback_error:
return {'error': f'Primary: {e}, Fallback: {fallback_error}'}
else:
return {'error': str(e)}
# 使用示例
if __name__ == "__main__":
client = OptimizedESClient(['localhost:9200'])
# 复杂查询
complex_query = {
"query": {
"bool": {
"must": [
{"match": {"title": "elasticsearch"}},
{"range": {"date": {"gte": "2024-01-01"}}}
]
}
},
"aggs": {
"categories": {
"terms": {"field": "category.keyword"}
}
}
}
# 简单降级查询
fallback_query = {
"query": {
"match": {"title": "elasticsearch"}
}
}
result = client.search_with_fallback(
"my_index",
complex_query,
fallback_query
)
if '_fallback' in result:
print("使用了降级查询")
print(f"搜索结果: {result.get('hits', {}).get('total', 0)} 条")
本章总结
本章详细介绍了Elasticsearch的性能优化和调优:
- 性能监控:学习了集群健康监控、性能指标分析和持续监控方法
- 索引优化:掌握了索引设计优化、批量索引优化和存储优化技巧
- 查询优化:了解了查询性能优化、缓存使用和分页优化方法
- 内存优化:学习了JVM配置优化、内存监控和垃圾收集调优
- 存储优化:掌握了磁盘配置、文件系统优化和冷热数据分离
- 网络优化:了解了网络配置优化和客户端连接优化技巧
下一章我们将学习Elasticsearch的集群管理和运维。
练习题
- 设计一个性能监控系统,实时监控集群的关键指标
- 优化一个大数据量索引的批量导入性能
- 分析并优化一个慢查询的性能问题
- 配置一个生产环境的JVM参数和系统参数