1. 集群架构与规划

1.1 集群角色规划

# 主节点配置 (elasticsearch.yml)
cluster.name: production-cluster
node.name: master-node-1
node.roles: [master]
network.host: 10.0.1.10
discovery.seed_hosts: ["10.0.1.10", "10.0.1.11", "10.0.1.12"]
cluster.initial_master_nodes: ["master-node-1", "master-node-2", "master-node-3"]

# 数据节点配置
cluster.name: production-cluster
node.name: data-node-1
node.roles: [data, ingest]
network.host: 10.0.2.10
path.data: ["/data1/elasticsearch", "/data2/elasticsearch"]
node.attr.rack: rack1
node.attr.temperature: hot

# 协调节点配置
cluster.name: production-cluster
node.name: coordinating-node-1
node.roles: []
network.host: 10.0.3.10
http.port: 9200

# 机器学习节点配置
cluster.name: production-cluster
node.name: ml-node-1
node.roles: [ml, remote_cluster_client]
network.host: 10.0.4.10
xpack.ml.enabled: true

1.2 集群容量规划

import math
from typing import Dict, Any, List

class ClusterCapacityPlanner:
    def __init__(self):
        self.shard_size_gb = 30  # 推荐分片大小
        self.heap_to_disk_ratio = 0.03  # 堆内存与磁盘比例
        self.replica_factor = 1  # 副本因子
        self.growth_factor = 1.5  # 增长因子
    
    def calculate_shard_count(self, data_size_gb: float, daily_growth_gb: float, retention_days: int) -> Dict[str, Any]:
        """计算分片数量"""
        # 计算总数据量
        total_data_gb = data_size_gb + (daily_growth_gb * retention_days)
        total_data_with_growth = total_data_gb * self.growth_factor
        
        # 计算主分片数量
        primary_shards = math.ceil(total_data_with_growth / self.shard_size_gb)
        
        # 计算总分片数量(包括副本)
        total_shards = primary_shards * (1 + self.replica_factor)
        
        return {
            "current_data_gb": data_size_gb,
            "projected_data_gb": total_data_with_growth,
            "recommended_primary_shards": primary_shards,
            "total_shards": total_shards,
            "shard_size_gb": self.shard_size_gb
        }
    
    def calculate_node_requirements(self, total_data_gb: float, shard_count: int) -> Dict[str, Any]:
        """计算节点需求"""
        # 计算存储需求
        storage_per_node_gb = total_data_gb / 3  # 假设3个数据节点
        
        # 计算内存需求
        heap_memory_gb = total_data_gb * self.heap_to_disk_ratio
        heap_per_node_gb = heap_memory_gb / 3
        total_memory_per_node_gb = heap_per_node_gb * 2  # 堆内存不超过总内存的50%
        
        # 计算CPU需求
        cpu_cores_per_node = max(8, shard_count // 3)  # 每个节点至少8核
        
        return {
            "data_nodes": 3,
            "master_nodes": 3,
            "coordinating_nodes": 2,
            "storage_per_data_node_gb": storage_per_node_gb,
            "heap_memory_per_node_gb": heap_per_node_gb,
            "total_memory_per_node_gb": total_memory_per_node_gb,
            "cpu_cores_per_node": cpu_cores_per_node
        }
    
    def generate_capacity_plan(self, requirements: Dict[str, Any]) -> Dict[str, Any]:
        """生成容量规划报告"""
        data_size_gb = requirements.get("data_size_gb", 1000)
        daily_growth_gb = requirements.get("daily_growth_gb", 10)
        retention_days = requirements.get("retention_days", 90)
        
        # 计算分片
        shard_info = self.calculate_shard_count(data_size_gb, daily_growth_gb, retention_days)
        
        # 计算节点需求
        node_info = self.calculate_node_requirements(
            shard_info["projected_data_gb"], 
            shard_info["total_shards"]
        )
        
        # 生成建议
        recommendations = []
        
        if shard_info["recommended_primary_shards"] > 1000:
            recommendations.append("分片数量过多,考虑增加分片大小或使用时间序列索引")
        
        if node_info["heap_memory_per_node_gb"] > 30:
            recommendations.append("堆内存需求过大,考虑增加节点数量")
        
        if node_info["storage_per_data_node_gb"] > 2000:
            recommendations.append("单节点存储需求过大,考虑增加数据节点")
        
        return {
            "shard_planning": shard_info,
            "node_requirements": node_info,
            "recommendations": recommendations,
            "estimated_cost": self._estimate_cost(node_info)
        }
    
    def _estimate_cost(self, node_info: Dict[str, Any]) -> Dict[str, Any]:
        """估算成本"""
        # 简化的成本估算(实际应根据云服务商定价)
        data_node_cost = 500  # 每月每个数据节点
        master_node_cost = 200  # 每月每个主节点
        coord_node_cost = 300  # 每月每个协调节点
        
        monthly_cost = (
            node_info["data_nodes"] * data_node_cost +
            node_info["master_nodes"] * master_node_cost +
            node_info["coordinating_nodes"] * coord_node_cost
        )
        
        return {
            "monthly_cost_usd": monthly_cost,
            "annual_cost_usd": monthly_cost * 12
        }

# 使用示例
if __name__ == "__main__":
    planner = ClusterCapacityPlanner()
    
    # 业务需求
    requirements = {
        "data_size_gb": 2000,  # 当前数据量2TB
        "daily_growth_gb": 50,  # 每日增长50GB
        "retention_days": 180   # 保留180天
    }
    
    plan = planner.generate_capacity_plan(requirements)
    
    print("=== 集群容量规划报告 ===")
    print(f"\n分片规划:")
    print(f"  当前数据量: {plan['shard_planning']['current_data_gb']} GB")
    print(f"  预计数据量: {plan['shard_planning']['projected_data_gb']:.0f} GB")
    print(f"  推荐主分片数: {plan['shard_planning']['recommended_primary_shards']}")
    print(f"  总分片数: {plan['shard_planning']['total_shards']}")
    
    print(f"\n节点需求:")
    print(f"  数据节点: {plan['node_requirements']['data_nodes']} 个")
    print(f"  主节点: {plan['node_requirements']['master_nodes']} 个")
    print(f"  协调节点: {plan['node_requirements']['coordinating_nodes']} 个")
    print(f"  每个数据节点存储: {plan['node_requirements']['storage_per_data_node_gb']:.0f} GB")
    print(f"  每个节点堆内存: {plan['node_requirements']['heap_memory_per_node_gb']:.1f} GB")
    print(f"  每个节点总内存: {plan['node_requirements']['total_memory_per_node_gb']:.1f} GB")
    print(f"  每个节点CPU核心: {plan['node_requirements']['cpu_cores_per_node']} 个")
    
    print(f"\n成本估算:")
    print(f"  月度成本: ${plan['estimated_cost']['monthly_cost_usd']}")
    print(f"  年度成本: ${plan['estimated_cost']['annual_cost_usd']}")
    
    if plan['recommendations']:
        print(f"\n优化建议:")
        for rec in plan['recommendations']:
            print(f"  - {rec}")

2. 集群部署与配置

2.1 自动化部署脚本

#!/bin/bash
# elasticsearch_cluster_deploy.sh

set -e

# 配置变量
CLUSTER_NAME="production-cluster"
ES_VERSION="8.11.0"
NODES=("10.0.1.10" "10.0.1.11" "10.0.1.12")
DATA_NODES=("10.0.2.10" "10.0.2.11" "10.0.2.12")
COORD_NODES=("10.0.3.10" "10.0.3.11")

# 系统优化函数
optimize_system() {
    local node_ip=$1
    echo "优化系统配置: $node_ip"
    
    ssh root@$node_ip << 'EOF'
        # 设置文件描述符限制
        echo "elasticsearch soft nofile 65536" >> /etc/security/limits.conf
        echo "elasticsearch hard nofile 65536" >> /etc/security/limits.conf
        echo "elasticsearch soft nproc 4096" >> /etc/security/limits.conf
        echo "elasticsearch hard nproc 4096" >> /etc/security/limits.conf
        
        # 设置虚拟内存
        echo "vm.max_map_count=262144" >> /etc/sysctl.conf
        sysctl -p
        
        # 禁用交换分区
        swapoff -a
        sed -i '/ swap / s/^/#/' /etc/fstab
        
        # 设置磁盘调度器
        echo noop > /sys/block/sdb/queue/scheduler
        
        # 创建elasticsearch用户
        useradd -m -s /bin/bash elasticsearch
        
        # 创建数据目录
        mkdir -p /data1/elasticsearch /data2/elasticsearch
        mkdir -p /var/log/elasticsearch
        chown -R elasticsearch:elasticsearch /data1 /data2 /var/log/elasticsearch
EOF
}

# 安装Elasticsearch函数
install_elasticsearch() {
    local node_ip=$1
    local node_type=$2
    local node_name=$3
    
    echo "安装Elasticsearch: $node_ip ($node_type)"
    
    ssh root@$node_ip << EOF
        # 下载并安装Elasticsearch
        cd /tmp
        wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ES_VERSION}-linux-x86_64.tar.gz
        tar -xzf elasticsearch-${ES_VERSION}-linux-x86_64.tar.gz
        mv elasticsearch-${ES_VERSION} /opt/elasticsearch
        chown -R elasticsearch:elasticsearch /opt/elasticsearch
        
        # 创建systemd服务文件
        cat > /etc/systemd/system/elasticsearch.service << 'SYSTEMD_EOF'
[Unit]
Description=Elasticsearch
Documentation=https://www.elastic.co
Wants=network-online.target
After=network-online.target
ConditionNetwork=true

[Service]
RuntimeDirectory=elasticsearch
PrivateTmp=true
Environment=ES_HOME=/opt/elasticsearch
Environment=ES_PATH_CONF=/opt/elasticsearch/config
Environment=PID_DIR=/var/run/elasticsearch
Environment=ES_SD_NOTIFY=true
EnvironmentFile=-/etc/default/elasticsearch

WorkingDirectory=/opt/elasticsearch

User=elasticsearch
Group=elasticsearch

ExecStart=/opt/elasticsearch/bin/elasticsearch

StandardOutput=journal
StandardError=inherit

SyslogIdentifier=elasticsearch

LimitNOFILE=65535
LimitNPROC=4096
LimitAS=infinity
LimitFSIZE=infinity
KillMode=mixed
KillSignal=SIGTERM
SendSIGKILL=no
SuccessExitStatus=143
TimeoutStopSec=0

[Install]
WantedBy=multi-user.target
SYSTEMD_EOF
        
        systemctl daemon-reload
        systemctl enable elasticsearch
EOF
}

# 配置节点函数
configure_node() {
    local node_ip=$1
    local node_type=$2
    local node_name=$3
    
    echo "配置节点: $node_ip ($node_type)"
    
    # 生成节点配置
    case $node_type in
        "master")
            node_roles="[master]"
            ;;
        "data")
            node_roles="[data, ingest]"
            ;;
        "coordinating")
            node_roles="[]"
            ;;
    esac
    
    # 生成discovery.seed_hosts
    seed_hosts=""
    for node in "${NODES[@]}" "${DATA_NODES[@]}" "${COORD_NODES[@]}"; do
        seed_hosts="$seed_hosts\"$node\", "
    done
    seed_hosts="[${seed_hosts%, }]"
    
    # 生成initial_master_nodes
    master_nodes=""
    for i in "${!NODES[@]}"; do
        master_nodes="$master_nodes\"master-node-$((i+1))\", "
    done
    master_nodes="[${master_nodes%, }]"
    
    ssh root@$node_ip << EOF
        # 备份原配置
        cp /opt/elasticsearch/config/elasticsearch.yml /opt/elasticsearch/config/elasticsearch.yml.bak
        
        # 生成新配置
        cat > /opt/elasticsearch/config/elasticsearch.yml << 'CONFIG_EOF'
# 集群配置
cluster.name: ${CLUSTER_NAME}
node.name: ${node_name}
node.roles: ${node_roles}

# 网络配置
network.host: ${node_ip}
http.port: 9200
transport.port: 9300

# 发现配置
discovery.seed_hosts: ${seed_hosts}
cluster.initial_master_nodes: ${master_nodes}

# 路径配置
path.data: ["/data1/elasticsearch", "/data2/elasticsearch"]
path.logs: "/var/log/elasticsearch"

# 内存配置
bootstrap.memory_lock: true

# 安全配置
xpack.security.enabled: false
xpack.security.transport.ssl.enabled: false
xpack.security.http.ssl.enabled: false

# 监控配置
xpack.monitoring.collection.enabled: true

# 其他配置
action.destructive_requires_name: true
cluster.routing.allocation.disk.threshold_enabled: true
cluster.routing.allocation.disk.watermark.low: 85%
cluster.routing.allocation.disk.watermark.high: 90%
cluster.routing.allocation.disk.watermark.flood_stage: 95%
CONFIG_EOF
        
        # 配置JVM
        cat > /opt/elasticsearch/config/jvm.options << 'JVM_EOF'
# JVM堆内存设置
-Xms16g
-Xmx16g

# GC设置
-XX:+UseG1GC
-XX:G1HeapRegionSize=16m
-XX:MaxGCPauseMillis=200
-XX:+UnlockExperimentalVMOptions
-XX:G1NewSizePercent=30
-XX:G1MaxNewSizePercent=40

# GC日志
-Xlog:gc*,gc+age=trace,safepoint:gc.log:time,level,tags
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=32
-XX:GCLogFileSize=64m

# 错误处理
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/elasticsearch
-XX:ErrorFile=/var/log/elasticsearch/hs_err_pid%p.log

# 其他优化
-Djava.io.tmpdir=/tmp
-Dfile.encoding=UTF-8
-Djna.nosys=true
-Dio.netty.noUnsafe=true
-Dio.netty.noKeySetOptimization=true
-Dio.netty.recycler.maxCapacityPerThread=0
-Dlog4j.shutdownHookEnabled=false
-Dlog4j2.disable.jmx=true
JVM_EOF
        
        chown -R elasticsearch:elasticsearch /opt/elasticsearch
EOF
}

# 启动集群函数
start_cluster() {
    echo "启动集群..."
    
    # 先启动主节点
    for i in "${!NODES[@]}"; do
        node_ip=${NODES[$i]}
        echo "启动主节点: $node_ip"
        ssh root@$node_ip "systemctl start elasticsearch"
        sleep 10
    done
    
    # 启动数据节点
    for i in "${!DATA_NODES[@]}"; do
        node_ip=${DATA_NODES[$i]}
        echo "启动数据节点: $node_ip"
        ssh root@$node_ip "systemctl start elasticsearch"
        sleep 5
    done
    
    # 启动协调节点
    for i in "${!COORD_NODES[@]}"; do
        node_ip=${COORD_NODES[$i]}
        echo "启动协调节点: $node_ip"
        ssh root@$node_ip "systemctl start elasticsearch"
        sleep 5
    done
}

# 验证集群函数
verify_cluster() {
    echo "验证集群状态..."
    sleep 30
    
    # 检查集群健康
    curl -s "http://${COORD_NODES[0]}:9200/_cluster/health?pretty"
    
    # 检查节点状态
    curl -s "http://${COORD_NODES[0]}:9200/_cat/nodes?v"
}

# 主执行流程
main() {
    echo "开始部署Elasticsearch集群..."
    
    # 优化所有节点系统配置
    for node in "${NODES[@]}" "${DATA_NODES[@]}" "${COORD_NODES[@]}"; do
        optimize_system $node
    done
    
    # 安装和配置主节点
    for i in "${!NODES[@]}"; do
        node_ip=${NODES[$i]}
        node_name="master-node-$((i+1))"
        install_elasticsearch $node_ip "master" $node_name
        configure_node $node_ip "master" $node_name
    done
    
    # 安装和配置数据节点
    for i in "${!DATA_NODES[@]}"; do
        node_ip=${DATA_NODES[$i]}
        node_name="data-node-$((i+1))"
        install_elasticsearch $node_ip "data" $node_name
        configure_node $node_ip "data" $node_name
    done
    
    # 安装和配置协调节点
    for i in "${!COORD_NODES[@]}"; do
        node_ip=${COORD_NODES[$i]}
        node_name="coordinating-node-$((i+1))"
        install_elasticsearch $node_ip "coordinating" $node_name
        configure_node $node_ip "coordinating" $node_name
    done
    
    # 启动集群
    start_cluster
    
    # 验证集群
    verify_cluster
    
    echo "集群部署完成!"
}

# 执行主函数
main

2.2 Docker Compose部署

# docker-compose.yml
version: '3.8'

services:
  # 主节点
  es-master-1:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
    container_name: es-master-1
    environment:
      - node.name=es-master-1
      - cluster.name=docker-cluster
      - node.roles=master
      - discovery.seed_hosts=es-master-1,es-master-2,es-master-3
      - cluster.initial_master_nodes=es-master-1,es-master-2,es-master-3
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms2g -Xmx2g"
      - xpack.security.enabled=false
      - xpack.security.transport.ssl.enabled=false
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - es-master-1-data:/usr/share/elasticsearch/data
      - es-master-1-logs:/usr/share/elasticsearch/logs
    ports:
      - "9201:9200"
    networks:
      - elastic
    deploy:
      resources:
        limits:
          memory: 4g

  es-master-2:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
    container_name: es-master-2
    environment:
      - node.name=es-master-2
      - cluster.name=docker-cluster
      - node.roles=master
      - discovery.seed_hosts=es-master-1,es-master-2,es-master-3
      - cluster.initial_master_nodes=es-master-1,es-master-2,es-master-3
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms2g -Xmx2g"
      - xpack.security.enabled=false
      - xpack.security.transport.ssl.enabled=false
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - es-master-2-data:/usr/share/elasticsearch/data
      - es-master-2-logs:/usr/share/elasticsearch/logs
    ports:
      - "9202:9200"
    networks:
      - elastic
    deploy:
      resources:
        limits:
          memory: 4g

  es-master-3:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
    container_name: es-master-3
    environment:
      - node.name=es-master-3
      - cluster.name=docker-cluster
      - node.roles=master
      - discovery.seed_hosts=es-master-1,es-master-2,es-master-3
      - cluster.initial_master_nodes=es-master-1,es-master-2,es-master-3
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms2g -Xmx2g"
      - xpack.security.enabled=false
      - xpack.security.transport.ssl.enabled=false
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - es-master-3-data:/usr/share/elasticsearch/data
      - es-master-3-logs:/usr/share/elasticsearch/logs
    ports:
      - "9203:9200"
    networks:
      - elastic
    deploy:
      resources:
        limits:
          memory: 4g

  # 数据节点
  es-data-1:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
    container_name: es-data-1
    environment:
      - node.name=es-data-1
      - cluster.name=docker-cluster
      - node.roles=data,ingest
      - discovery.seed_hosts=es-master-1,es-master-2,es-master-3
      - cluster.initial_master_nodes=es-master-1,es-master-2,es-master-3
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms8g -Xmx8g"
      - xpack.security.enabled=false
      - xpack.security.transport.ssl.enabled=false
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - es-data-1-data:/usr/share/elasticsearch/data
      - es-data-1-logs:/usr/share/elasticsearch/logs
    networks:
      - elastic
    deploy:
      resources:
        limits:
          memory: 16g
    depends_on:
      - es-master-1
      - es-master-2
      - es-master-3

  es-data-2:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
    container_name: es-data-2
    environment:
      - node.name=es-data-2
      - cluster.name=docker-cluster
      - node.roles=data,ingest
      - discovery.seed_hosts=es-master-1,es-master-2,es-master-3
      - cluster.initial_master_nodes=es-master-1,es-master-2,es-master-3
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms8g -Xmx8g"
      - xpack.security.enabled=false
      - xpack.security.transport.ssl.enabled=false
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - es-data-2-data:/usr/share/elasticsearch/data
      - es-data-2-logs:/usr/share/elasticsearch/logs
    networks:
      - elastic
    deploy:
      resources:
        limits:
          memory: 16g
    depends_on:
      - es-master-1
      - es-master-2
      - es-master-3

  # 协调节点
  es-coordinating:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
    container_name: es-coordinating
    environment:
      - node.name=es-coordinating
      - cluster.name=docker-cluster
      - node.roles=[]
      - discovery.seed_hosts=es-master-1,es-master-2,es-master-3
      - cluster.initial_master_nodes=es-master-1,es-master-2,es-master-3
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms4g -Xmx4g"
      - xpack.security.enabled=false
      - xpack.security.transport.ssl.enabled=false
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    volumes:
      - es-coordinating-logs:/usr/share/elasticsearch/logs
    ports:
      - "9200:9200"
    networks:
      - elastic
    deploy:
      resources:
        limits:
          memory: 8g
    depends_on:
      - es-master-1
      - es-master-2
      - es-master-3
      - es-data-1
      - es-data-2

  # Kibana
  kibana:
    image: docker.elastic.co/kibana/kibana:8.11.0
    container_name: kibana
    environment:
      - ELASTICSEARCH_HOSTS=http://es-coordinating:9200
      - SERVER_NAME=kibana
      - SERVER_HOST=0.0.0.0
    ports:
      - "5601:5601"
    networks:
      - elastic
    depends_on:
      - es-coordinating

volumes:
  es-master-1-data:
  es-master-1-logs:
  es-master-2-data:
  es-master-2-logs:
  es-master-3-data:
  es-master-3-logs:
  es-data-1-data:
  es-data-1-logs:
  es-data-2-data:
  es-data-2-logs:
  es-coordinating-logs:

networks:
  elastic:
    driver: bridge

3. 集群监控与告警

3.1 监控系统搭建

import requests
import json
import time
import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
from datetime import datetime
from typing import Dict, List, Any
import logging

class ElasticsearchClusterMonitor:
    def __init__(self, cluster_hosts: List[str], alert_config: Dict[str, Any]):
        self.cluster_hosts = cluster_hosts
        self.alert_config = alert_config
        self.logger = self._setup_logging()
        
        # 告警阈值
        self.thresholds = {
            'cluster_status': ['red'],
            'heap_usage': 85,  # 百分比
            'disk_usage': 90,  # 百分比
            'cpu_usage': 80,   # 百分比
            'unassigned_shards': 0,
            'search_latency': 1000,  # 毫秒
            'indexing_latency': 500   # 毫秒
        }
    
    def _setup_logging(self) -> logging.Logger:
        """设置日志"""
        logger = logging.getLogger('es_monitor')
        logger.setLevel(logging.INFO)
        
        handler = logging.FileHandler('/var/log/elasticsearch/monitor.log')
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        
        return logger
    
    def get_cluster_health(self) -> Dict[str, Any]:
        """获取集群健康状态"""
        for host in self.cluster_hosts:
            try:
                response = requests.get(
                    f"http://{host}:9200/_cluster/health",
                    timeout=10
                )
                return response.json()
            except Exception as e:
                self.logger.warning(f"无法连接到 {host}: {e}")
                continue
        
        raise Exception("无法连接到任何Elasticsearch节点")
    
    def get_node_stats(self) -> Dict[str, Any]:
        """获取节点统计信息"""
        for host in self.cluster_hosts:
            try:
                response = requests.get(
                    f"http://{host}:9200/_nodes/stats",
                    timeout=10
                )
                return response.json()
            except Exception as e:
                self.logger.warning(f"无法获取节点统计信息 {host}: {e}")
                continue
        
        raise Exception("无法获取节点统计信息")
    
    def get_index_stats(self) -> Dict[str, Any]:
        """获取索引统计信息"""
        for host in self.cluster_hosts:
            try:
                response = requests.get(
                    f"http://{host}:9200/_stats",
                    timeout=10
                )
                return response.json()
            except Exception as e:
                self.logger.warning(f"无法获取索引统计信息 {host}: {e}")
                continue
        
        raise Exception("无法获取索引统计信息")
    
    def check_cluster_health(self) -> List[Dict[str, Any]]:
        """检查集群健康状态"""
        alerts = []
        
        try:
            health = self.get_cluster_health()
            
            # 检查集群状态
            if health['status'] in self.thresholds['cluster_status']:
                alerts.append({
                    'type': 'cluster_status',
                    'severity': 'critical',
                    'message': f"集群状态异常: {health['status']}",
                    'details': health
                })
            
            # 检查未分配分片
            if health['unassigned_shards'] > self.thresholds['unassigned_shards']:
                alerts.append({
                    'type': 'unassigned_shards',
                    'severity': 'warning',
                    'message': f"存在未分配分片: {health['unassigned_shards']}",
                    'details': health
                })
        
        except Exception as e:
            alerts.append({
                'type': 'connection_error',
                'severity': 'critical',
                'message': f"无法获取集群健康状态: {e}",
                'details': {}
            })
        
        return alerts
    
    def check_node_performance(self) -> List[Dict[str, Any]]:
        """检查节点性能"""
        alerts = []
        
        try:
            node_stats = self.get_node_stats()
            
            for node_id, node_data in node_stats['nodes'].items():
                node_name = node_data['name']
                
                # 检查堆内存使用率
                heap_percent = node_data['jvm']['mem']['heap_used_percent']
                if heap_percent > self.thresholds['heap_usage']:
                    alerts.append({
                        'type': 'heap_usage',
                        'severity': 'warning',
                        'message': f"节点 {node_name} 堆内存使用率过高: {heap_percent}%",
                        'details': {'node': node_name, 'heap_percent': heap_percent}
                    })
                
                # 检查磁盘使用率
                fs_data = node_data['fs']['total']
                disk_percent = (fs_data['total_in_bytes'] - fs_data['available_in_bytes']) / fs_data['total_in_bytes'] * 100
                if disk_percent > self.thresholds['disk_usage']:
                    alerts.append({
                        'type': 'disk_usage',
                        'severity': 'critical',
                        'message': f"节点 {node_name} 磁盘使用率过高: {disk_percent:.1f}%",
                        'details': {'node': node_name, 'disk_percent': disk_percent}
                    })
                
                # 检查CPU使用率
                if 'os' in node_data and 'cpu' in node_data['os']:
                    cpu_percent = node_data['os']['cpu']['percent']
                    if cpu_percent > self.thresholds['cpu_usage']:
                        alerts.append({
                            'type': 'cpu_usage',
                            'severity': 'warning',
                            'message': f"节点 {node_name} CPU使用率过高: {cpu_percent}%",
                            'details': {'node': node_name, 'cpu_percent': cpu_percent}
                        })
        
        except Exception as e:
            alerts.append({
                'type': 'node_stats_error',
                'severity': 'warning',
                'message': f"无法获取节点统计信息: {e}",
                'details': {}
            })
        
        return alerts
    
    def check_performance_metrics(self) -> List[Dict[str, Any]]:
        """检查性能指标"""
        alerts = []
        
        try:
            index_stats = self.get_index_stats()
            
            # 检查搜索延迟
            search_time = index_stats['_all']['total']['search']['query_time_in_millis']
            search_count = index_stats['_all']['total']['search']['query_total']
            if search_count > 0:
                avg_search_time = search_time / search_count
                if avg_search_time > self.thresholds['search_latency']:
                    alerts.append({
                        'type': 'search_latency',
                        'severity': 'warning',
                        'message': f"搜索延迟过高: {avg_search_time:.2f}ms",
                        'details': {'avg_search_time': avg_search_time}
                    })
            
            # 检查索引延迟
            index_time = index_stats['_all']['total']['indexing']['index_time_in_millis']
            index_count = index_stats['_all']['total']['indexing']['index_total']
            if index_count > 0:
                avg_index_time = index_time / index_count
                if avg_index_time > self.thresholds['indexing_latency']:
                    alerts.append({
                        'type': 'indexing_latency',
                        'severity': 'warning',
                        'message': f"索引延迟过高: {avg_index_time:.2f}ms",
                        'details': {'avg_index_time': avg_index_time}
                    })
        
        except Exception as e:
            alerts.append({
                'type': 'performance_metrics_error',
                'severity': 'warning',
                'message': f"无法获取性能指标: {e}",
                'details': {}
            })
        
        return alerts
    
    def send_alert(self, alerts: List[Dict[str, Any]]):
        """发送告警"""
        if not alerts:
            return
        
        # 邮件告警
        if self.alert_config.get('email'):
            self._send_email_alert(alerts)
        
        # 钉钉告警
        if self.alert_config.get('dingtalk'):
            self._send_dingtalk_alert(alerts)
        
        # 记录告警日志
        for alert in alerts:
            self.logger.error(f"告警: {alert['message']}")
    
    def _send_email_alert(self, alerts: List[Dict[str, Any]]):
        """发送邮件告警"""
        try:
            email_config = self.alert_config['email']
            
            msg = MimeMultipart()
            msg['From'] = email_config['from']
            msg['To'] = ', '.join(email_config['to'])
            msg['Subject'] = f"Elasticsearch集群告警 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
            
            # 构建邮件内容
            body = "Elasticsearch集群监控告警:\n\n"
            for alert in alerts:
                body += f"类型: {alert['type']}\n"
                body += f"严重程度: {alert['severity']}\n"
                body += f"消息: {alert['message']}\n"
                body += f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
                body += "-" * 50 + "\n"
            
            msg.attach(MimeText(body, 'plain', 'utf-8'))
            
            # 发送邮件
            server = smtplib.SMTP(email_config['smtp_server'], email_config['smtp_port'])
            if email_config.get('use_tls'):
                server.starttls()
            if email_config.get('username'):
                server.login(email_config['username'], email_config['password'])
            
            server.send_message(msg)
            server.quit()
            
            self.logger.info("邮件告警发送成功")
        
        except Exception as e:
            self.logger.error(f"发送邮件告警失败: {e}")
    
    def _send_dingtalk_alert(self, alerts: List[Dict[str, Any]]):
        """发送钉钉告警"""
        try:
            dingtalk_config = self.alert_config['dingtalk']
            
            # 构建钉钉消息
            text = "Elasticsearch集群监控告警:\n\n"
            for alert in alerts:
                text += f"🚨 {alert['message']}\n"
            
            text += f"\n时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
            
            payload = {
                "msgtype": "text",
                "text": {
                    "content": text
                }
            }
            
            response = requests.post(
                dingtalk_config['webhook_url'],
                json=payload,
                timeout=10
            )
            
            if response.status_code == 200:
                self.logger.info("钉钉告警发送成功")
            else:
                self.logger.error(f"钉钉告警发送失败: {response.text}")
        
        except Exception as e:
            self.logger.error(f"发送钉钉告警失败: {e}")
    
    def run_monitoring(self, interval: int = 60):
        """运行监控"""
        self.logger.info("开始Elasticsearch集群监控")
        
        while True:
            try:
                all_alerts = []
                
                # 检查集群健康
                all_alerts.extend(self.check_cluster_health())
                
                # 检查节点性能
                all_alerts.extend(self.check_node_performance())
                
                # 检查性能指标
                all_alerts.extend(self.check_performance_metrics())
                
                # 发送告警
                if all_alerts:
                    self.send_alert(all_alerts)
                else:
                    self.logger.info("集群状态正常")
                
                time.sleep(interval)
            
            except Exception as e:
                self.logger.error(f"监控异常: {e}")
                time.sleep(interval)

# 使用示例
if __name__ == "__main__":
    # 配置
    cluster_hosts = ['10.0.3.10', '10.0.3.11']  # 协调节点
    
    alert_config = {
        'email': {
            'smtp_server': 'smtp.gmail.com',
            'smtp_port': 587,
            'use_tls': True,
            'from': 'monitor@company.com',
            'to': ['admin@company.com', 'ops@company.com'],
            'username': 'monitor@company.com',
            'password': 'password'
        },
        'dingtalk': {
            'webhook_url': 'https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN'
        }
    }
    
    # 创建监控实例
    monitor = ElasticsearchClusterMonitor(cluster_hosts, alert_config)
    
    # 运行监控(每分钟检查一次)
    monitor.run_monitoring(interval=60)

4. 备份与恢复

4.1 快照备份策略

# 配置快照仓库
PUT /_snapshot/backup_repository
{
  "type": "fs",
  "settings": {
    "location": "/backup/elasticsearch",
    "compress": true,
    "chunk_size": "1gb",
    "max_restore_bytes_per_sec": "100mb",
    "max_snapshot_bytes_per_sec": "100mb"
  }
}

# 创建快照策略
PUT /_slm/policy/daily_backup
{
  "schedule": "0 2 * * *",
  "name": "<daily-backup-{now/d}>",
  "repository": "backup_repository",
  "config": {
    "indices": ["logs-*", "metrics-*"],
    "ignore_unavailable": true,
    "include_global_state": false,
    "metadata": {
      "taken_by": "automated_backup",
      "taken_because": "daily scheduled backup"
    }
  },
  "retention": {
    "expire_after": "30d",
    "min_count": 7,
    "max_count": 50
  }
}

# 启动快照策略
POST /_slm/policy/daily_backup/_execute

# 查看快照状态
GET /_snapshot/backup_repository/_all

# 恢复快照
POST /_snapshot/backup_repository/daily-backup-2024-01-01/_restore
{
  "indices": "logs-2024-01-01",
  "ignore_unavailable": true,
  "include_global_state": false,
  "rename_pattern": "logs-(.+)",
  "rename_replacement": "restored-logs-$1",
  "index_settings": {
    "index.number_of_replicas": 0
  }
}

4.2 自动化备份脚本

import requests
import json
import os
import shutil
import subprocess
from datetime import datetime, timedelta
from typing import Dict, List, Any
import logging

class ElasticsearchBackupManager:
    def __init__(self, es_host: str, backup_config: Dict[str, Any]):
        self.es_host = es_host
        self.es_url = f"http://{es_host}:9200"
        self.backup_config = backup_config
        self.logger = self._setup_logging()
    
    def _setup_logging(self) -> logging.Logger:
        """设置日志"""
        logger = logging.getLogger('es_backup')
        logger.setLevel(logging.INFO)
        
        handler = logging.FileHandler('/var/log/elasticsearch/backup.log')
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        
        return logger
    
    def setup_repository(self) -> bool:
        """设置备份仓库"""
        try:
            repository_config = {
                "type": "fs",
                "settings": {
                    "location": self.backup_config['local_path'],
                    "compress": True,
                    "chunk_size": "1gb",
                    "max_restore_bytes_per_sec": "100mb",
                    "max_snapshot_bytes_per_sec": "100mb"
                }
            }
            
            response = requests.put(
                f"{self.es_url}/_snapshot/{self.backup_config['repository_name']}",
                json=repository_config,
                timeout=30
            )
            
            if response.status_code == 200:
                self.logger.info("备份仓库设置成功")
                return True
            else:
                self.logger.error(f"备份仓库设置失败: {response.text}")
                return False
        
        except Exception as e:
            self.logger.error(f"设置备份仓库异常: {e}")
            return False
    
    def create_snapshot(self, snapshot_name: str, indices: List[str] = None) -> bool:
        """创建快照"""
        try:
            snapshot_config = {
                "indices": indices or ["*"],
                "ignore_unavailable": True,
                "include_global_state": False,
                "metadata": {
                    "taken_by": "backup_manager",
                    "taken_at": datetime.now().isoformat(),
                    "description": f"Automated backup - {snapshot_name}"
                }
            }
            
            response = requests.put(
                f"{self.es_url}/_snapshot/{self.backup_config['repository_name']}/{snapshot_name}",
                json=snapshot_config,
                timeout=30
            )
            
            if response.status_code == 200:
                self.logger.info(f"快照 {snapshot_name} 创建成功")
                return True
            else:
                self.logger.error(f"快照 {snapshot_name} 创建失败: {response.text}")
                return False
        
        except Exception as e:
            self.logger.error(f"创建快照异常: {e}")
            return False
    
    def wait_for_snapshot_completion(self, snapshot_name: str, timeout: int = 3600) -> bool:
        """等待快照完成"""
        start_time = datetime.now()
        
        while (datetime.now() - start_time).seconds < timeout:
            try:
                response = requests.get(
                    f"{self.es_url}/_snapshot/{self.backup_config['repository_name']}/{snapshot_name}",
                    timeout=10
                )
                
                if response.status_code == 200:
                    snapshot_info = response.json()
                    state = snapshot_info['snapshots'][0]['state']
                    
                    if state == 'SUCCESS':
                        self.logger.info(f"快照 {snapshot_name} 完成")
                        return True
                    elif state == 'FAILED':
                        self.logger.error(f"快照 {snapshot_name} 失败")
                        return False
                    elif state in ['IN_PROGRESS', 'STARTED']:
                        self.logger.info(f"快照 {snapshot_name} 进行中...")
                        time.sleep(30)
                    else:
                        self.logger.warning(f"快照 {snapshot_name} 状态未知: {state}")
                        time.sleep(30)
                else:
                    self.logger.error(f"无法获取快照状态: {response.text}")
                    return False
            
            except Exception as e:
                self.logger.error(f"检查快照状态异常: {e}")
                time.sleep(30)
        
        self.logger.error(f"快照 {snapshot_name} 超时")
        return False
    
    def list_snapshots(self) -> List[Dict[str, Any]]:
        """列出所有快照"""
        try:
            response = requests.get(
                f"{self.es_url}/_snapshot/{self.backup_config['repository_name']}/_all",
                timeout=30
            )
            
            if response.status_code == 200:
                return response.json()['snapshots']
            else:
                self.logger.error(f"获取快照列表失败: {response.text}")
                return []
        
        except Exception as e:
            self.logger.error(f"获取快照列表异常: {e}")
            return []
    
    def delete_old_snapshots(self, retention_days: int = 30) -> int:
        """删除过期快照"""
        deleted_count = 0
        cutoff_date = datetime.now() - timedelta(days=retention_days)
        
        snapshots = self.list_snapshots()
        
        for snapshot in snapshots:
            snapshot_name = snapshot['snapshot']
            start_time = datetime.fromisoformat(snapshot['start_time'].replace('Z', '+00:00'))
            
            if start_time < cutoff_date:
                try:
                    response = requests.delete(
                        f"{self.es_url}/_snapshot/{self.backup_config['repository_name']}/{snapshot_name}",
                        timeout=30
                    )
                    
                    if response.status_code == 200:
                        self.logger.info(f"删除过期快照: {snapshot_name}")
                        deleted_count += 1
                    else:
                        self.logger.error(f"删除快照失败: {snapshot_name} - {response.text}")
                
                except Exception as e:
                    self.logger.error(f"删除快照异常: {snapshot_name} - {e}")
        
        return deleted_count
    
    def upload_to_cloud(self, snapshot_name: str) -> bool:
        """上传到云存储"""
        if not self.backup_config.get('cloud_upload'):
            return True
        
        try:
            cloud_config = self.backup_config['cloud_upload']
            local_path = os.path.join(self.backup_config['local_path'], 'indices')
            
            if cloud_config['type'] == 's3':
                # AWS S3上传
                cmd = [
                    'aws', 's3', 'sync',
                    local_path,
                    f"s3://{cloud_config['bucket']}/elasticsearch-backups/{snapshot_name}/",
                    '--delete'
                ]
                
                result = subprocess.run(cmd, capture_output=True, text=True)
                
                if result.returncode == 0:
                    self.logger.info(f"快照 {snapshot_name} 上传到S3成功")
                    return True
                else:
                    self.logger.error(f"S3上传失败: {result.stderr}")
                    return False
            
            elif cloud_config['type'] == 'oss':
                # 阿里云OSS上传
                cmd = [
                    'ossutil', 'cp',
                    local_path,
                    f"oss://{cloud_config['bucket']}/elasticsearch-backups/{snapshot_name}/",
                    '-r', '-f'
                ]
                
                result = subprocess.run(cmd, capture_output=True, text=True)
                
                if result.returncode == 0:
                    self.logger.info(f"快照 {snapshot_name} 上传到OSS成功")
                    return True
                else:
                    self.logger.error(f"OSS上传失败: {result.stderr}")
                    return False
        
        except Exception as e:
            self.logger.error(f"云存储上传异常: {e}")
            return False
        
        return False
    
    def restore_snapshot(self, snapshot_name: str, indices: List[str] = None, 
                        rename_pattern: str = None, rename_replacement: str = None) -> bool:
        """恢复快照"""
        try:
            restore_config = {
                "indices": indices or ["*"],
                "ignore_unavailable": True,
                "include_global_state": False,
                "index_settings": {
                    "index.number_of_replicas": 0
                }
            }
            
            if rename_pattern and rename_replacement:
                restore_config["rename_pattern"] = rename_pattern
                restore_config["rename_replacement"] = rename_replacement
            
            response = requests.post(
                f"{self.es_url}/_snapshot/{self.backup_config['repository_name']}/{snapshot_name}/_restore",
                json=restore_config,
                timeout=30
            )
            
            if response.status_code == 200:
                self.logger.info(f"快照 {snapshot_name} 恢复启动成功")
                return True
            else:
                self.logger.error(f"快照 {snapshot_name} 恢复失败: {response.text}")
                return False
        
        except Exception as e:
            self.logger.error(f"恢复快照异常: {e}")
            return False
    
    def run_backup(self, indices: List[str] = None) -> bool:
        """运行完整备份流程"""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        snapshot_name = f"backup_{timestamp}"
        
        self.logger.info(f"开始备份流程: {snapshot_name}")
        
        # 1. 设置仓库
        if not self.setup_repository():
            return False
        
        # 2. 创建快照
        if not self.create_snapshot(snapshot_name, indices):
            return False
        
        # 3. 等待快照完成
        if not self.wait_for_snapshot_completion(snapshot_name):
            return False
        
        # 4. 上传到云存储
        if not self.upload_to_cloud(snapshot_name):
            self.logger.warning("云存储上传失败,但本地备份成功")
        
        # 5. 清理过期快照
        deleted_count = self.delete_old_snapshots(
            self.backup_config.get('retention_days', 30)
        )
        self.logger.info(f"清理了 {deleted_count} 个过期快照")
        
        self.logger.info(f"备份流程完成: {snapshot_name}")
        return True

# 使用示例
if __name__ == "__main__":
    # 配置
    backup_config = {
        'repository_name': 'backup_repository',
        'local_path': '/backup/elasticsearch',
        'retention_days': 30,
        'cloud_upload': {
            'type': 's3',
            'bucket': 'my-es-backups',
            'region': 'us-west-2'
        }
    }
    
    # 创建备份管理器
    backup_manager = ElasticsearchBackupManager('10.0.3.10', backup_config)
    
    # 运行备份
    backup_manager.run_backup(['logs-*', 'metrics-*'])

5. 故障排查与恢复

5.1 常见故障诊断

import requests
import json
from typing import Dict, List, Any

class ElasticsearchTroubleshooter:
    def __init__(self, es_host: str):
        self.es_host = es_host
        self.es_url = f"http://{es_host}:9200"
    
    def diagnose_cluster_issues(self) -> Dict[str, Any]:
        """诊断集群问题"""
        diagnosis = {
            'cluster_health': self._check_cluster_health(),
            'node_issues': self._check_node_issues(),
            'shard_issues': self._check_shard_issues(),
            'performance_issues': self._check_performance_issues(),
            'disk_issues': self._check_disk_issues(),
            'memory_issues': self._check_memory_issues()
        }
        
        return diagnosis
    
    def _check_cluster_health(self) -> Dict[str, Any]:
        """检查集群健康状态"""
        try:
            response = requests.get(f"{self.es_url}/_cluster/health", timeout=10)
            health = response.json()
            
            issues = []
            recommendations = []
            
            if health['status'] == 'red':
                issues.append("集群状态为红色,存在不可用的主分片")
                recommendations.append("检查节点状态和分片分配")
            elif health['status'] == 'yellow':
                issues.append("集群状态为黄色,存在未分配的副本分片")
                recommendations.append("检查副本分片配置和节点容量")
            
            if health['unassigned_shards'] > 0:
                issues.append(f"存在 {health['unassigned_shards']} 个未分配分片")
                recommendations.append("使用 _cluster/allocation/explain API 查看分配失败原因")
            
            return {
                'status': health['status'],
                'issues': issues,
                'recommendations': recommendations,
                'details': health
            }
        
        except Exception as e:
            return {
                'status': 'unknown',
                'issues': [f"无法获取集群健康状态: {e}"],
                'recommendations': ["检查Elasticsearch服务是否运行"],
                'details': {}
            }
    
    def _check_node_issues(self) -> Dict[str, Any]:
        """检查节点问题"""
        try:
            response = requests.get(f"{self.es_url}/_cat/nodes?v&h=name,heap.percent,ram.percent,cpu,load_1m,disk.used_percent,master", timeout=10)
            nodes_info = response.text.strip().split('\n')[1:]  # 跳过标题行
            
            issues = []
            recommendations = []
            
            for node_line in nodes_info:
                parts = node_line.split()
                if len(parts) >= 6:
                    name, heap_percent, ram_percent, cpu, load, disk_percent = parts[:6]
                    
                    # 检查堆内存使用率
                    if heap_percent.replace('%', '').isdigit():
                        heap_val = int(heap_percent.replace('%', ''))
                        if heap_val > 85:
                            issues.append(f"节点 {name} 堆内存使用率过高: {heap_percent}")
                            recommendations.append(f"优化 {name} 节点的JVM配置或增加内存")
                    
                    # 检查磁盘使用率
                    if disk_percent.replace('%', '').isdigit():
                        disk_val = int(disk_percent.replace('%', ''))
                        if disk_val > 90:
                            issues.append(f"节点 {name} 磁盘使用率过高: {disk_percent}")
                            recommendations.append(f"清理 {name} 节点磁盘空间或添加存储")
            
            return {
                'issues': issues,
                'recommendations': recommendations,
                'node_count': len(nodes_info)
            }
        
        except Exception as e:
            return {
                'issues': [f"无法获取节点信息: {e}"],
                'recommendations': ["检查集群连接和节点状态"],
                'node_count': 0
            }
    
    def _check_shard_issues(self) -> Dict[str, Any]:
        """检查分片问题"""
        try:
            # 获取未分配分片信息
            response = requests.get(f"{self.es_url}/_cluster/allocation/explain", timeout=10)
            allocation_info = response.json()
            
            issues = []
            recommendations = []
            
            if 'allocate_explanation' in allocation_info:
                explanation = allocation_info['allocate_explanation']
                issues.append(f"分片分配失败: {explanation}")
                recommendations.append("检查节点容量和分片分配策略")
            
            # 获取分片状态
            response = requests.get(f"{self.es_url}/_cat/shards?v&h=index,shard,prirep,state,unassigned.reason", timeout=10)
            shards_info = response.text.strip().split('\n')[1:]
            
            unassigned_count = 0
            for shard_line in shards_info:
                if 'UNASSIGNED' in shard_line:
                    unassigned_count += 1
            
            if unassigned_count > 0:
                issues.append(f"发现 {unassigned_count} 个未分配分片")
                recommendations.append("使用 _cluster/reroute API 手动分配分片")
            
            return {
                'issues': issues,
                'recommendations': recommendations,
                'unassigned_shards': unassigned_count
            }
        
        except Exception as e:
            return {
                'issues': [f"无法获取分片信息: {e}"],
                'recommendations': ["检查集群状态和API可用性"],
                'unassigned_shards': 0
            }
    
    def _check_performance_issues(self) -> Dict[str, Any]:
        """检查性能问题"""
        try:
            response = requests.get(f"{self.es_url}/_stats", timeout=10)
            stats = response.json()
            
            issues = []
            recommendations = []
            
            # 检查搜索性能
            search_stats = stats['_all']['total']['search']
            if search_stats['query_total'] > 0:
                avg_search_time = search_stats['query_time_in_millis'] / search_stats['query_total']
                if avg_search_time > 1000:  # 超过1秒
                    issues.append(f"搜索性能较差,平均响应时间: {avg_search_time:.2f}ms")
                    recommendations.append("优化查询语句和索引结构")
            
            # 检查索引性能
            index_stats = stats['_all']['total']['indexing']
            if index_stats['index_total'] > 0:
                avg_index_time = index_stats['index_time_in_millis'] / index_stats['index_total']
                if avg_index_time > 500:  # 超过500ms
                    issues.append(f"索引性能较差,平均索引时间: {avg_index_time:.2f}ms")
                    recommendations.append("优化索引配置和批量索引策略")
            
            return {
                'issues': issues,
                'recommendations': recommendations,
                'search_stats': search_stats,
                'index_stats': index_stats
            }
        
        except Exception as e:
            return {
                'issues': [f"无法获取性能统计: {e}"],
                'recommendations': ["检查集群状态和统计API"],
                'search_stats': {},
                'index_stats': {}
            }
    
    def _check_disk_issues(self) -> Dict[str, Any]:
        """检查磁盘问题"""
        try:
            response = requests.get(f"{self.es_url}/_nodes/stats/fs", timeout=10)
            nodes_stats = response.json()
            
            issues = []
            recommendations = []
            
            for node_id, node_data in nodes_stats['nodes'].items():
                node_name = node_data['name']
                fs_total = node_data['fs']['total']
                
                total_bytes = fs_total['total_in_bytes']
                available_bytes = fs_total['available_in_bytes']
                used_percent = (total_bytes - available_bytes) / total_bytes * 100
                
                if used_percent > 90:
                    issues.append(f"节点 {node_name} 磁盘使用率过高: {used_percent:.1f}%")
                    recommendations.append(f"清理 {node_name} 节点磁盘或扩容")
                elif used_percent > 85:
                    issues.append(f"节点 {node_name} 磁盘使用率较高: {used_percent:.1f}%")
                    recommendations.append(f"监控 {node_name} 节点磁盘使用情况")
            
            return {
                'issues': issues,
                'recommendations': recommendations
            }
        
        except Exception as e:
            return {
                'issues': [f"无法获取磁盘信息: {e}"],
                'recommendations': ["检查节点状态和文件系统"]
            }
    
    def _check_memory_issues(self) -> Dict[str, Any]:
        """检查内存问题"""
        try:
            response = requests.get(f"{self.es_url}/_nodes/stats/jvm", timeout=10)
            nodes_stats = response.json()
            
            issues = []
            recommendations = []
            
            for node_id, node_data in nodes_stats['nodes'].items():
                node_name = node_data['name']
                jvm_stats = node_data['jvm']
                
                # 检查堆内存使用率
                heap_percent = jvm_stats['mem']['heap_used_percent']
                if heap_percent > 85:
                    issues.append(f"节点 {node_name} JVM堆内存使用率过高: {heap_percent}%")
                    recommendations.append(f"优化 {node_name} 节点JVM配置")
                
                # 检查GC频率
                gc_stats = jvm_stats['gc']['collectors']
                for gc_name, gc_data in gc_stats.items():
                    if gc_data['collection_count'] > 0:
                        avg_gc_time = gc_data['collection_time_in_millis'] / gc_data['collection_count']
                        if avg_gc_time > 100:  # 平均GC时间超过100ms
                            issues.append(f"节点 {node_name} GC性能较差: {avg_gc_time:.2f}ms")
                            recommendations.append(f"调整 {node_name} 节点GC参数")
            
            return {
                'issues': issues,
                'recommendations': recommendations
            }
        
        except Exception as e:
            return {
                'issues': [f"无法获取内存信息: {e}"],
                'recommendations': ["检查JVM状态和内存配置"]
            }
    
    def generate_diagnosis_report(self) -> str:
        """生成诊断报告"""
        diagnosis = self.diagnose_cluster_issues()
        
        report = "=== Elasticsearch集群诊断报告 ===\n\n"
        
        for category, info in diagnosis.items():
            report += f"## {category.replace('_', ' ').title()}\n"
            
            if info.get('issues'):
                report += "### 发现的问题:\n"
                for issue in info['issues']:
                    report += f"- {issue}\n"
                report += "\n"
            
            if info.get('recommendations'):
                report += "### 建议措施:\n"
                for rec in info['recommendations']:
                    report += f"- {rec}\n"
                report += "\n"
            
            if not info.get('issues'):
                report += "✅ 未发现问题\n\n"
        
        return report

# 使用示例
if __name__ == "__main__":
    troubleshooter = ElasticsearchTroubleshooter('10.0.3.10')
    report = troubleshooter.generate_diagnosis_report()
    print(report)

5.2 自动恢复脚本

#!/bin/bash
# elasticsearch_auto_recovery.sh

set -e

ES_HOST="10.0.3.10"
ES_PORT="9200"
LOG_FILE="/var/log/elasticsearch/auto_recovery.log"

# 日志函数
log() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a $LOG_FILE
}

# 检查集群健康状态
check_cluster_health() {
    local status=$(curl -s "http://${ES_HOST}:${ES_PORT}/_cluster/health" | jq -r '.status')
    echo $status
}

# 重新分配未分配的分片
reallocate_unassigned_shards() {
    log "开始重新分配未分配的分片"
    
    # 获取未分配的分片
    local unassigned_shards=$(curl -s "http://${ES_HOST}:${ES_PORT}/_cat/shards" | grep UNASSIGNED)
    
    if [ -z "$unassigned_shards" ]; then
        log "没有发现未分配的分片"
        return 0
    fi
    
    # 尝试重新分配
    curl -X POST "http://${ES_HOST}:${ES_PORT}/_cluster/reroute?retry_failed=true" \
        -H 'Content-Type: application/json' \
        -d '{
            "commands": [
                {
                    "allocate_empty_primary": {
                        "index": "*",
                        "shard": 0,
                        "node": "data-node-1",
                        "accept_data_loss": true
                    }
                }
            ]
        }'
    
    log "分片重新分配命令已发送"
}

# 清理缓存
clear_caches() {
    log "清理集群缓存"
    
    curl -X POST "http://${ES_HOST}:${ES_PORT}/_cache/clear"
    curl -X POST "http://${ES_HOST}:${ES_PORT}/_refresh"
    
    log "缓存清理完成"
}

# 重启有问题的节点
restart_problematic_nodes() {
    log "检查需要重启的节点"
    
    # 获取节点状态
    local nodes_info=$(curl -s "http://${ES_HOST}:${ES_PORT}/_cat/nodes?h=name,heap.percent,master")
    
    while IFS= read -r line; do
        local node_name=$(echo $line | awk '{print $1}')
        local heap_percent=$(echo $line | awk '{print $2}' | sed 's/%//')
        local is_master=$(echo $line | awk '{print $3}')
        
        # 如果堆内存使用率超过95%且不是主节点,重启该节点
        if [ "$heap_percent" -gt 95 ] && [ "$is_master" != "*" ]; then
            log "节点 $node_name 堆内存使用率过高($heap_percent%),准备重启"
            
            # 这里需要根据实际部署方式调整重启命令
            ssh root@$node_name "systemctl restart elasticsearch"
            
            log "节点 $node_name 重启完成"
            sleep 30
        fi
    done <<< "$nodes_info"
}

# 修复索引
repair_indices() {
    log "检查需要修复的索引"
    
    # 获取红色索引
    local red_indices=$(curl -s "http://${ES_HOST}:${ES_PORT}/_cat/indices?h=health,index" | grep red | awk '{print $2}')
    
    if [ -z "$red_indices" ]; then
        log "没有发现需要修复的索引"
        return 0
    fi
    
    while IFS= read -r index; do
        log "修复索引: $index"
        
        # 尝试关闭并重新打开索引
        curl -X POST "http://${ES_HOST}:${ES_PORT}/${index}/_close"
        sleep 5
        curl -X POST "http://${ES_HOST}:${ES_PORT}/${index}/_open"
        
        log "索引 $index 修复完成"
    done <<< "$red_indices"
}

# 主恢复流程
main_recovery() {
    log "开始自动恢复流程"
    
    local initial_status=$(check_cluster_health)
    log "当前集群状态: $initial_status"
    
    if [ "$initial_status" = "green" ]; then
        log "集群状态正常,无需恢复"
        return 0
    fi
    
    # 步骤1: 清理缓存
    clear_caches
    sleep 10
    
    # 步骤2: 重新分配分片
    reallocate_unassigned_shards
    sleep 30
    
    # 检查状态
    local status_after_realloc=$(check_cluster_health)
    log "重新分配后集群状态: $status_after_realloc"
    
    if [ "$status_after_realloc" = "green" ]; then
        log "集群恢复成功"
        return 0
    fi
    
    # 步骤3: 修复索引
    repair_indices
    sleep 30
    
    # 检查状态
    local status_after_repair=$(check_cluster_health)
    log "修复索引后集群状态: $status_after_repair"
    
    if [ "$status_after_repair" = "green" ]; then
        log "集群恢复成功"
        return 0
    fi
    
    # 步骤4: 重启有问题的节点
    restart_problematic_nodes
    sleep 60
    
    # 最终检查
    local final_status=$(check_cluster_health)
    log "最终集群状态: $final_status"
    
    if [ "$final_status" = "green" ]; then
        log "集群恢复成功"
    else
        log "自动恢复失败,需要人工干预"
        # 发送告警
        echo "Elasticsearch集群自动恢复失败,当前状态: $final_status" | \
            mail -s "ES集群恢复失败告警" admin@company.com
    fi
}

# 执行恢复
main_recovery

6. 章节总结

本章详细介绍了Elasticsearch集群的管理与运维,主要内容包括:

6.1 核心知识点

  1. 集群架构规划

    • 节点角色分配(主节点、数据节点、协调节点、机器学习节点)
    • 容量规划方法和工具
    • 成本估算和优化建议
  2. 自动化部署

    • Shell脚本自动化部署
    • Docker Compose容器化部署
    • 系统优化和配置管理
  3. 监控告警系统

    • 集群健康监控
    • 性能指标监控
    • 多渠道告警机制(邮件、钉钉)
  4. 备份恢复策略

    • 快照备份配置
    • 自动化备份脚本
    • 云存储集成
    • 恢复流程管理
  5. 故障诊断与恢复

    • 自动化故障诊断
    • 常见问题排查
    • 自动恢复机制

6.2 最佳实践

  1. 监控策略

    • 建立完善的监控体系
    • 设置合理的告警阈值
    • 定期检查集群健康状态
  2. 备份策略

    • 制定定期备份计划
    • 实施多层备份策略
    • 定期测试恢复流程
  3. 运维自动化

    • 使用脚本自动化日常运维任务
    • 建立故障自动恢复机制
    • 实施配置管理和版本控制

6.3 练习题

  1. 设计一个包含9个节点的Elasticsearch集群架构,要求支持10TB数据存储
  2. 编写一个监控脚本,当集群状态变为yellow时自动发送告警
  3. 实现一个备份策略,要求每日备份并保留30天的快照
  4. 设计一个故障恢复流程,处理节点宕机的情况
  5. 优化一个高负载集群的性能,包括JVM、索引和查询优化

通过本章的学习,你应该能够独立管理和运维Elasticsearch集群,处理各种常见的运维场景和故障情况。