概述

本章将详细介绍HBase集群的管理与运维,包括集群部署、监控、备份恢复、故障排除等关键运维技能。通过Python模拟类来演示各种运维操作和最佳实践。

1. 集群部署与配置

1.1 集群架构管理

import time
import threading
import json
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Set
from datetime import datetime, timedelta
import random
import uuid

class NodeType(Enum):
    """节点类型枚举"""
    MASTER = "master"
    REGION_SERVER = "regionserver"
    ZOOKEEPER = "zookeeper"
    HDFS_NAMENODE = "namenode"
    HDFS_DATANODE = "datanode"

class NodeStatus(Enum):
    """节点状态枚举"""
    RUNNING = "running"
    STOPPED = "stopped"
    STARTING = "starting"
    STOPPING = "stopping"
    ERROR = "error"
    MAINTENANCE = "maintenance"

class DeploymentMode(Enum):
    """部署模式枚举"""
    STANDALONE = "standalone"
    PSEUDO_DISTRIBUTED = "pseudo_distributed"
    FULLY_DISTRIBUTED = "fully_distributed"

@dataclass
class NodeInfo:
    """节点信息数据类"""
    node_id: str
    hostname: str
    ip_address: str
    node_type: NodeType
    status: NodeStatus
    port: int
    start_time: Optional[datetime] = None
    last_heartbeat: Optional[datetime] = None
    cpu_usage: float = 0.0
    memory_usage: float = 0.0
    disk_usage: float = 0.0
    network_io: Dict[str, float] = field(default_factory=dict)
    process_id: Optional[int] = None
    version: str = "2.4.0"
    config: Dict[str, Any] = field(default_factory=dict)

@dataclass
class ClusterConfig:
    """集群配置数据类"""
    cluster_name: str
    deployment_mode: DeploymentMode
    replication_factor: int = 3
    region_size_mb: int = 256
    max_file_size_mb: int = 10240
    compaction_threshold: int = 3
    block_cache_size: float = 0.4
    memstore_size: float = 0.4
    zookeeper_session_timeout: int = 90000
    rpc_timeout: int = 60000
    custom_configs: Dict[str, Any] = field(default_factory=dict)

class HBaseClusterManager:
    """HBase集群管理器"""
    
    def __init__(self, cluster_config: ClusterConfig):
        self.config = cluster_config
        self.nodes: Dict[str, NodeInfo] = {}
        self.cluster_status = NodeStatus.STOPPED
        self.deployment_history: List[Dict[str, Any]] = []
        self._lock = threading.Lock()
        self.monitoring_enabled = False
        self.monitoring_thread = None
    
    def add_node(self, hostname: str, ip_address: str, node_type: NodeType, port: int) -> str:
        """添加节点"""
        node_id = f"{node_type.value}_{hostname}_{port}"
        
        node_info = NodeInfo(
            node_id=node_id,
            hostname=hostname,
            ip_address=ip_address,
            node_type=node_type,
            status=NodeStatus.STOPPED,
            port=port
        )
        
        with self._lock:
            self.nodes[node_id] = node_info
        
        self._log_deployment_event("node_added", {
            "node_id": node_id,
            "hostname": hostname,
            "node_type": node_type.value
        })
        
        return node_id
    
    def remove_node(self, node_id: str) -> bool:
        """移除节点"""
        with self._lock:
            if node_id in self.nodes:
                node = self.nodes[node_id]
                if node.status == NodeStatus.RUNNING:
                    self.stop_node(node_id)
                
                del self.nodes[node_id]
                
                self._log_deployment_event("node_removed", {
                    "node_id": node_id,
                    "hostname": node.hostname
                })
                
                return True
        return False
    
    def start_node(self, node_id: str) -> bool:
        """启动节点"""
        if node_id not in self.nodes:
            return False
        
        node = self.nodes[node_id]
        if node.status == NodeStatus.RUNNING:
            return True
        
        # 模拟启动过程
        node.status = NodeStatus.STARTING
        node.start_time = datetime.now()
        
        # 模拟启动延迟
        time.sleep(0.1)
        
        # 模拟启动成功
        node.status = NodeStatus.RUNNING
        node.process_id = random.randint(1000, 9999)
        node.last_heartbeat = datetime.now()
        
        self._log_deployment_event("node_started", {
            "node_id": node_id,
            "hostname": node.hostname,
            "process_id": node.process_id
        })
        
        return True
    
    def stop_node(self, node_id: str) -> bool:
        """停止节点"""
        if node_id not in self.nodes:
            return False
        
        node = self.nodes[node_id]
        if node.status == NodeStatus.STOPPED:
            return True
        
        # 模拟停止过程
        node.status = NodeStatus.STOPPING
        
        # 模拟停止延迟
        time.sleep(0.05)
        
        # 模拟停止成功
        node.status = NodeStatus.STOPPED
        node.process_id = None
        node.last_heartbeat = None
        
        self._log_deployment_event("node_stopped", {
            "node_id": node_id,
            "hostname": node.hostname
        })
        
        return True
    
    def start_cluster(self) -> bool:
        """启动集群"""
        print("启动HBase集群...")
        
        # 按顺序启动不同类型的节点
        startup_order = [
            NodeType.ZOOKEEPER,
            NodeType.HDFS_NAMENODE,
            NodeType.HDFS_DATANODE,
            NodeType.MASTER,
            NodeType.REGION_SERVER
        ]
        
        for node_type in startup_order:
            nodes_of_type = [node_id for node_id, node in self.nodes.items() 
                           if node.node_type == node_type]
            
            for node_id in nodes_of_type:
                if not self.start_node(node_id):
                    print(f"启动节点 {node_id} 失败")
                    return False
                print(f"启动节点 {node_id} 成功")
            
            # 等待节点类型之间的启动间隔
            time.sleep(0.1)
        
        self.cluster_status = NodeStatus.RUNNING
        
        # 启动监控
        self.start_monitoring()
        
        self._log_deployment_event("cluster_started", {
            "cluster_name": self.config.cluster_name,
            "node_count": len(self.nodes)
        })
        
        print("HBase集群启动完成")
        return True
    
    def stop_cluster(self) -> bool:
        """停止集群"""
        print("停止HBase集群...")
        
        # 停止监控
        self.stop_monitoring()
        
        # 按相反顺序停止节点
        shutdown_order = [
            NodeType.REGION_SERVER,
            NodeType.MASTER,
            NodeType.HDFS_DATANODE,
            NodeType.HDFS_NAMENODE,
            NodeType.ZOOKEEPER
        ]
        
        for node_type in shutdown_order:
            nodes_of_type = [node_id for node_id, node in self.nodes.items() 
                           if node.node_type == node_type]
            
            for node_id in nodes_of_type:
                if not self.stop_node(node_id):
                    print(f"停止节点 {node_id} 失败")
                    return False
                print(f"停止节点 {node_id} 成功")
            
            time.sleep(0.05)
        
        self.cluster_status = NodeStatus.STOPPED
        
        self._log_deployment_event("cluster_stopped", {
            "cluster_name": self.config.cluster_name
        })
        
        print("HBase集群停止完成")
        return True
    
    def restart_cluster(self) -> bool:
        """重启集群"""
        print("重启HBase集群...")
        
        if not self.stop_cluster():
            return False
        
        time.sleep(0.2)  # 等待完全停止
        
        return self.start_cluster()
    
    def get_cluster_status(self) -> Dict[str, Any]:
        """获取集群状态"""
        running_nodes = sum(1 for node in self.nodes.values() 
                          if node.status == NodeStatus.RUNNING)
        
        node_status_counts = {}
        for status in NodeStatus:
            node_status_counts[status.value] = sum(1 for node in self.nodes.values() 
                                                  if node.status == status)
        
        node_type_counts = {}
        for node_type in NodeType:
            node_type_counts[node_type.value] = sum(1 for node in self.nodes.values() 
                                                   if node.node_type == node_type)
        
        return {
            "cluster_name": self.config.cluster_name,
            "cluster_status": self.cluster_status.value,
            "total_nodes": len(self.nodes),
            "running_nodes": running_nodes,
            "node_status_distribution": node_status_counts,
            "node_type_distribution": node_type_counts,
            "deployment_mode": self.config.deployment_mode.value,
            "uptime": self._calculate_cluster_uptime()
        }
    
    def get_node_details(self, node_id: str) -> Optional[Dict[str, Any]]:
        """获取节点详细信息"""
        if node_id not in self.nodes:
            return None
        
        node = self.nodes[node_id]
        
        uptime = None
        if node.start_time and node.status == NodeStatus.RUNNING:
            uptime = (datetime.now() - node.start_time).total_seconds()
        
        return {
            "node_id": node.node_id,
            "hostname": node.hostname,
            "ip_address": node.ip_address,
            "node_type": node.node_type.value,
            "status": node.status.value,
            "port": node.port,
            "process_id": node.process_id,
            "version": node.version,
            "uptime_seconds": uptime,
            "cpu_usage_percent": node.cpu_usage,
            "memory_usage_percent": node.memory_usage,
            "disk_usage_percent": node.disk_usage,
            "last_heartbeat": node.last_heartbeat.isoformat() if node.last_heartbeat else None
        }
    
    def update_node_config(self, node_id: str, config_updates: Dict[str, Any]) -> bool:
        """更新节点配置"""
        if node_id not in self.nodes:
            return False
        
        node = self.nodes[node_id]
        node.config.update(config_updates)
        
        self._log_deployment_event("node_config_updated", {
            "node_id": node_id,
            "config_updates": config_updates
        })
        
        return True
    
    def start_monitoring(self):
        """启动监控"""
        if not self.monitoring_enabled:
            self.monitoring_enabled = True
            self.monitoring_thread = threading.Thread(target=self._monitoring_loop)
            self.monitoring_thread.daemon = True
            self.monitoring_thread.start()
    
    def stop_monitoring(self):
        """停止监控"""
        self.monitoring_enabled = False
        if self.monitoring_thread:
            self.monitoring_thread.join(timeout=1)
    
    def _monitoring_loop(self):
        """监控循环"""
        while self.monitoring_enabled:
            self._update_node_metrics()
            time.sleep(5)  # 每5秒更新一次指标
    
    def _update_node_metrics(self):
        """更新节点指标"""
        for node in self.nodes.values():
            if node.status == NodeStatus.RUNNING:
                # 模拟指标更新
                node.cpu_usage = random.uniform(10, 80)
                node.memory_usage = random.uniform(20, 90)
                node.disk_usage = random.uniform(30, 70)
                node.network_io = {
                    "bytes_in": random.uniform(1000, 10000),
                    "bytes_out": random.uniform(500, 5000)
                }
                node.last_heartbeat = datetime.now()
    
    def _calculate_cluster_uptime(self) -> Optional[float]:
        """计算集群运行时间"""
        if self.cluster_status != NodeStatus.RUNNING:
            return None
        
        running_nodes = [node for node in self.nodes.values() 
                        if node.status == NodeStatus.RUNNING and node.start_time]
        
        if not running_nodes:
            return None
        
        # 使用最早启动的节点时间作为集群启动时间
        earliest_start = min(node.start_time for node in running_nodes)
        return (datetime.now() - earliest_start).total_seconds()
    
    def _log_deployment_event(self, event_type: str, details: Dict[str, Any]):
        """记录部署事件"""
        event = {
            "timestamp": datetime.now().isoformat(),
            "event_type": event_type,
            "details": details
        }
        self.deployment_history.append(event)
        
        # 限制历史记录数量
        if len(self.deployment_history) > 1000:
            self.deployment_history = self.deployment_history[-1000:]
    
    def get_deployment_history(self, limit: int = 50) -> List[Dict[str, Any]]:
        """获取部署历史"""
        return self.deployment_history[-limit:]

# 集群管理示例
print("=== HBase集群管理示例 ===")

# 创建集群配置
cluster_config = ClusterConfig(
    cluster_name="hbase-prod-cluster",
    deployment_mode=DeploymentMode.FULLY_DISTRIBUTED,
    replication_factor=3,
    region_size_mb=512,
    max_file_size_mb=20480
)

# 创建集群管理器
cluster_manager = HBaseClusterManager(cluster_config)

print("1. 添加集群节点:")

# 添加ZooKeeper节点
zk1_id = cluster_manager.add_node("zk1.example.com", "192.168.1.10", NodeType.ZOOKEEPER, 2181)
zk2_id = cluster_manager.add_node("zk2.example.com", "192.168.1.11", NodeType.ZOOKEEPER, 2181)
zk3_id = cluster_manager.add_node("zk3.example.com", "192.168.1.12", NodeType.ZOOKEEPER, 2181)

# 添加HDFS节点
nn_id = cluster_manager.add_node("namenode.example.com", "192.168.1.20", NodeType.HDFS_NAMENODE, 9000)
dn1_id = cluster_manager.add_node("datanode1.example.com", "192.168.1.21", NodeType.HDFS_DATANODE, 9866)
dn2_id = cluster_manager.add_node("datanode2.example.com", "192.168.1.22", NodeType.HDFS_DATANODE, 9866)

# 添加HBase节点
master_id = cluster_manager.add_node("hmaster.example.com", "192.168.1.30", NodeType.MASTER, 16000)
rs1_id = cluster_manager.add_node("regionserver1.example.com", "192.168.1.31", NodeType.REGION_SERVER, 16020)
rs2_id = cluster_manager.add_node("regionserver2.example.com", "192.168.1.32", NodeType.REGION_SERVER, 16020)
rs3_id = cluster_manager.add_node("regionserver3.example.com", "192.168.1.33", NodeType.REGION_SERVER, 16020)

print(f"  添加了 {len(cluster_manager.nodes)} 个节点")

print("\n2. 启动集群:")
if cluster_manager.start_cluster():
    print("  集群启动成功")
else:
    print("  集群启动失败")

# 等待监控数据更新
time.sleep(1)

print("\n3. 集群状态:")
status = cluster_manager.get_cluster_status()
for key, value in status.items():
    print(f"  {key}: {value}")

print("\n4. 节点详细信息:")
for node_id in [master_id, rs1_id]:
    details = cluster_manager.get_node_details(node_id)
    if details:
        print(f"  节点 {node_id}:")
        for key, value in details.items():
            print(f"    {key}: {value}")
        print()

print("\n5. 更新节点配置:")
config_updates = {
    "hbase.regionserver.handler.count": 30,
    "hbase.hregion.memstore.flush.size": 134217728
}
if cluster_manager.update_node_config(rs1_id, config_updates):
    print(f"  更新节点 {rs1_id} 配置成功")

print("\n6. 部署历史:")
history = cluster_manager.get_deployment_history(10)
for event in history[-5:]:  # 显示最后5个事件
    print(f"  {event['timestamp']}: {event['event_type']} - {event['details']}")

1.2 配置管理

class ConfigType(Enum):
    """配置类型枚举"""
    HBASE_SITE = "hbase-site.xml"
    HBASE_ENV = "hbase-env.sh"
    REGIONSERVER = "regionservers"
    BACKUP_MASTERS = "backup-masters"
    LOG4J = "log4j.properties"

class ConfigScope(Enum):
    """配置范围枚举"""
    CLUSTER = "cluster"
    NODE_TYPE = "node_type"
    INDIVIDUAL_NODE = "individual_node"

@dataclass
class ConfigItem:
    """配置项数据类"""
    key: str
    value: Any
    config_type: ConfigType
    scope: ConfigScope
    description: str = ""
    default_value: Any = None
    validation_rule: Optional[str] = None
    requires_restart: bool = False
    last_modified: Optional[datetime] = None
    modified_by: str = "system"

class HBaseConfigManager:
    """HBase配置管理器"""
    
    def __init__(self, cluster_manager: HBaseClusterManager):
        self.cluster_manager = cluster_manager
        self.configs: Dict[str, ConfigItem] = {}
        self.config_templates: Dict[ConfigType, Dict[str, Any]] = {}
        self.config_history: List[Dict[str, Any]] = []
        self._initialize_default_configs()
    
    def _initialize_default_configs(self):
        """初始化默认配置"""
        default_configs = [
            ConfigItem(
                key="hbase.rootdir",
                value="hdfs://namenode:9000/hbase",
                config_type=ConfigType.HBASE_SITE,
                scope=ConfigScope.CLUSTER,
                description="HBase根目录",
                requires_restart=True
            ),
            ConfigItem(
                key="hbase.cluster.distributed",
                value="true",
                config_type=ConfigType.HBASE_SITE,
                scope=ConfigScope.CLUSTER,
                description="是否为分布式模式",
                requires_restart=True
            ),
            ConfigItem(
                key="hbase.zookeeper.quorum",
                value="zk1.example.com,zk2.example.com,zk3.example.com",
                config_type=ConfigType.HBASE_SITE,
                scope=ConfigScope.CLUSTER,
                description="ZooKeeper集群地址",
                requires_restart=True
            ),
            ConfigItem(
                key="hbase.regionserver.handler.count",
                value="30",
                config_type=ConfigType.HBASE_SITE,
                scope=ConfigScope.NODE_TYPE,
                description="RegionServer处理线程数",
                default_value="10",
                validation_rule="range:1-200"
            ),
            ConfigItem(
                key="hbase.hregion.memstore.flush.size",
                value="134217728",
                config_type=ConfigType.HBASE_SITE,
                scope=ConfigScope.NODE_TYPE,
                description="MemStore刷新大小(字节)",
                default_value="67108864",
                validation_rule="range:1048576-1073741824"
            ),
            ConfigItem(
                key="hbase.regionserver.global.memstore.size",
                value="0.4",
                config_type=ConfigType.HBASE_SITE,
                scope=ConfigScope.NODE_TYPE,
                description="全局MemStore大小比例",
                default_value="0.4",
                validation_rule="range:0.1-0.8"
            ),
            ConfigItem(
                key="hfile.block.cache.size",
                value="0.4",
                config_type=ConfigType.HBASE_SITE,
                scope=ConfigScope.NODE_TYPE,
                description="块缓存大小比例",
                default_value="0.25",
                validation_rule="range:0.1-0.8"
            ),
            ConfigItem(
                key="HBASE_HEAPSIZE",
                value="4G",
                config_type=ConfigType.HBASE_ENV,
                scope=ConfigScope.NODE_TYPE,
                description="HBase堆内存大小",
                default_value="1G",
                requires_restart=True
            )
        ]
        
        for config in default_configs:
            config.last_modified = datetime.now()
            self.configs[config.key] = config
    
    def set_config(self, key: str, value: Any, scope: ConfigScope = ConfigScope.CLUSTER, 
                   modified_by: str = "admin") -> bool:
        """设置配置项"""
        if key in self.configs:
            old_value = self.configs[key].value
            
            # 验证配置值
            if not self._validate_config_value(key, value):
                return False
            
            # 更新配置
            self.configs[key].value = value
            self.configs[key].scope = scope
            self.configs[key].last_modified = datetime.now()
            self.configs[key].modified_by = modified_by
            
            # 记录配置变更历史
            self._log_config_change(key, old_value, value, modified_by)
            
            return True
        else:
            # 创建新配置项
            config_item = ConfigItem(
                key=key,
                value=value,
                config_type=ConfigType.HBASE_SITE,  # 默认类型
                scope=scope,
                last_modified=datetime.now(),
                modified_by=modified_by
            )
            self.configs[key] = config_item
            
            self._log_config_change(key, None, value, modified_by)
            return True
    
    def get_config(self, key: str) -> Optional[ConfigItem]:
        """获取配置项"""
        return self.configs.get(key)
    
    def get_configs_by_type(self, config_type: ConfigType) -> Dict[str, ConfigItem]:
        """按类型获取配置"""
        return {k: v for k, v in self.configs.items() if v.config_type == config_type}
    
    def get_configs_by_scope(self, scope: ConfigScope) -> Dict[str, ConfigItem]:
        """按范围获取配置"""
        return {k: v for k, v in self.configs.items() if v.scope == scope}
    
    def validate_all_configs(self) -> Dict[str, List[str]]:
        """验证所有配置"""
        validation_results = {}
        
        for key, config in self.configs.items():
            errors = []
            
            # 验证配置值
            if not self._validate_config_value(key, config.value):
                errors.append(f"配置值 '{config.value}' 不符合验证规则 '{config.validation_rule}'")
            
            # 检查依赖关系
            dependency_errors = self._check_config_dependencies(key, config)
            errors.extend(dependency_errors)
            
            if errors:
                validation_results[key] = errors
        
        return validation_results
    
    def generate_config_files(self) -> Dict[ConfigType, str]:
        """生成配置文件内容"""
        config_files = {}
        
        # 生成hbase-site.xml
        hbase_site_configs = self.get_configs_by_type(ConfigType.HBASE_SITE)
        hbase_site_xml = self._generate_hbase_site_xml(hbase_site_configs)
        config_files[ConfigType.HBASE_SITE] = hbase_site_xml
        
        # 生成hbase-env.sh
        hbase_env_configs = self.get_configs_by_type(ConfigType.HBASE_ENV)
        hbase_env_sh = self._generate_hbase_env_sh(hbase_env_configs)
        config_files[ConfigType.HBASE_ENV] = hbase_env_sh
        
        # 生成regionservers文件
        regionservers = self._generate_regionservers_file()
        config_files[ConfigType.REGIONSERVER] = regionservers
        
        return config_files
    
    def apply_config_template(self, template_name: str, node_type: NodeType = None) -> bool:
        """应用配置模板"""
        templates = {
            "high_performance": {
                "hbase.regionserver.handler.count": "50",
                "hbase.hregion.memstore.flush.size": "268435456",
                "hfile.block.cache.size": "0.5",
                "HBASE_HEAPSIZE": "8G"
            },
            "memory_optimized": {
                "hbase.regionserver.global.memstore.size": "0.5",
                "hfile.block.cache.size": "0.3",
                "hbase.hregion.memstore.flush.size": "67108864"
            },
            "io_optimized": {
                "hbase.regionserver.handler.count": "100",
                "hbase.regionserver.thread.compaction.large": "4",
                "hbase.regionserver.thread.compaction.small": "2"
            }
        }
        
        if template_name not in templates:
            return False
        
        template_configs = templates[template_name]
        
        for key, value in template_configs.items():
            scope = ConfigScope.NODE_TYPE if node_type else ConfigScope.CLUSTER
            self.set_config(key, value, scope, f"template:{template_name}")
        
        self._log_config_change("template_applied", None, template_name, "system")
        return True
    
    def backup_configs(self) -> str:
        """备份配置"""
        backup_id = f"config_backup_{int(time.time())}"
        
        backup_data = {
            "backup_id": backup_id,
            "timestamp": datetime.now().isoformat(),
            "configs": {}
        }
        
        for key, config in self.configs.items():
            backup_data["configs"][key] = {
                "value": config.value,
                "config_type": config.config_type.value,
                "scope": config.scope.value,
                "description": config.description,
                "last_modified": config.last_modified.isoformat() if config.last_modified else None,
                "modified_by": config.modified_by
            }
        
        # 在实际环境中,这里会保存到文件或数据库
        print(f"配置备份完成: {backup_id}")
        return backup_id
    
    def restore_configs(self, backup_data: Dict[str, Any]) -> bool:
        """恢复配置"""
        try:
            for key, config_data in backup_data["configs"].items():
                config_item = ConfigItem(
                    key=key,
                    value=config_data["value"],
                    config_type=ConfigType(config_data["config_type"]),
                    scope=ConfigScope(config_data["scope"]),
                    description=config_data["description"],
                    modified_by=config_data["modified_by"]
                )
                
                if config_data["last_modified"]:
                    config_item.last_modified = datetime.fromisoformat(config_data["last_modified"])
                
                self.configs[key] = config_item
            
            self._log_config_change("configs_restored", None, backup_data["backup_id"], "system")
            return True
            
        except Exception as e:
            print(f"配置恢复失败: {e}")
            return False
    
    def _validate_config_value(self, key: str, value: Any) -> bool:
        """验证配置值"""
        if key not in self.configs:
            return True  # 新配置项,暂时允许
        
        config = self.configs[key]
        if not config.validation_rule:
            return True
        
        rule = config.validation_rule
        
        # 范围验证
        if rule.startswith("range:"):
            range_part = rule.split(":")[1]
            if "-" in range_part:
                min_val, max_val = map(float, range_part.split("-"))
                try:
                    num_value = float(value)
                    return min_val <= num_value <= max_val
                except ValueError:
                    return False
        
        # 枚举验证
        elif rule.startswith("enum:"):
            enum_values = rule.split(":")[1].split(",")
            return str(value) in enum_values
        
        # 正则表达式验证
        elif rule.startswith("regex:"):
            import re
            pattern = rule.split(":", 1)[1]
            return bool(re.match(pattern, str(value)))
        
        return True
    
    def _check_config_dependencies(self, key: str, config: ConfigItem) -> List[str]:
        """检查配置依赖关系"""
        errors = []
        
        # 检查内存相关配置的总和不超过1.0
        if key in ["hbase.regionserver.global.memstore.size", "hfile.block.cache.size"]:
            memstore_size = float(self.configs.get("hbase.regionserver.global.memstore.size", ConfigItem("", 0.4, ConfigType.HBASE_SITE, ConfigScope.CLUSTER)).value)
            cache_size = float(self.configs.get("hfile.block.cache.size", ConfigItem("", 0.25, ConfigType.HBASE_SITE, ConfigScope.CLUSTER)).value)
            
            if memstore_size + cache_size > 0.9:
                errors.append("MemStore和BlockCache大小总和不应超过90%")
        
        return errors
    
    def _generate_hbase_site_xml(self, configs: Dict[str, ConfigItem]) -> str:
        """生成hbase-site.xml内容"""
        xml_content = '<?xml version="1.0"?>\n'
        xml_content += '<configuration>\n'
        
        for key, config in configs.items():
            xml_content += '  <property>\n'
            xml_content += f'    <name>{key}</name>\n'
            xml_content += f'    <value>{config.value}</value>\n'
            if config.description:
                xml_content += f'    <description>{config.description}</description>\n'
            xml_content += '  </property>\n'
        
        xml_content += '</configuration>\n'
        return xml_content
    
    def _generate_hbase_env_sh(self, configs: Dict[str, ConfigItem]) -> str:
        """生成hbase-env.sh内容"""
        env_content = '#!/bin/bash\n\n'
        env_content += '# HBase Environment Configuration\n\n'
        
        for key, config in configs.items():
            if config.description:
                env_content += f'# {config.description}\n'
            env_content += f'export {key}={config.value}\n\n'
        
        return env_content
    
    def _generate_regionservers_file(self) -> str:
        """生成regionservers文件内容"""
        regionservers = []
        
        for node in self.cluster_manager.nodes.values():
            if node.node_type == NodeType.REGION_SERVER:
                regionservers.append(node.hostname)
        
        return '\n'.join(regionservers) + '\n'
    
    def _log_config_change(self, key: str, old_value: Any, new_value: Any, modified_by: str):
        """记录配置变更"""
        change_record = {
            "timestamp": datetime.now().isoformat(),
            "key": key,
            "old_value": old_value,
            "new_value": new_value,
            "modified_by": modified_by
        }
        
        self.config_history.append(change_record)
        
        # 限制历史记录数量
        if len(self.config_history) > 1000:
            self.config_history = self.config_history[-1000:]
    
    def get_config_history(self, limit: int = 50) -> List[Dict[str, Any]]:
        """获取配置变更历史"""
        return self.config_history[-limit:]
    
    def get_configs_requiring_restart(self) -> List[str]:
        """获取需要重启的配置项"""
        return [key for key, config in self.configs.items() if config.requires_restart]

# 配置管理示例
print("\n=== HBase配置管理示例 ===")

# 创建配置管理器
config_manager = HBaseConfigManager(cluster_manager)

print("1. 当前配置:")
for key, config in list(config_manager.configs.items())[:5]:  # 显示前5个配置
    print(f"  {key}: {config.value} ({config.config_type.value})")

print("\n2. 更新配置:")
config_manager.set_config("hbase.regionserver.handler.count", "50", ConfigScope.NODE_TYPE, "admin")
config_manager.set_config("hfile.block.cache.size", "0.5", ConfigScope.CLUSTER, "admin")
print("  配置更新完成")

print("\n3. 验证配置:")
validation_errors = config_manager.validate_all_configs()
if validation_errors:
    for key, errors in validation_errors.items():
        print(f"  {key}: {errors}")
else:
    print("  所有配置验证通过")

print("\n4. 应用性能优化模板:")
if config_manager.apply_config_template("high_performance"):
    print("  高性能模板应用成功")

print("\n5. 生成配置文件:")
config_files = config_manager.generate_config_files()
for config_type, content in config_files.items():
    print(f"  {config_type.value}: {len(content)} 字符")

print("\n6. 配置变更历史:")
history = config_manager.get_config_history(5)
for change in history:
    print(f"  {change['timestamp']}: {change['key']} = {change['new_value']} (by {change['modified_by']})")

print("\n7. 需要重启的配置:")
restart_configs = config_manager.get_configs_requiring_restart()
for config_key in restart_configs:
    print(f"  {config_key}")

2. 监控与告警

2.1 性能监控

class MetricType(Enum):
    """指标类型枚举"""
    COUNTER = "counter"
    GAUGE = "gauge"
    HISTOGRAM = "histogram"
    TIMER = "timer"

class AlertLevel(Enum):
    """告警级别枚举"""
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

class AlertStatus(Enum):
    """告警状态枚举"""
    ACTIVE = "active"
    RESOLVED = "resolved"
    SUPPRESSED = "suppressed"

@dataclass
class Metric:
    """指标数据类"""
    name: str
    value: float
    metric_type: MetricType
    timestamp: datetime
    labels: Dict[str, str] = field(default_factory=dict)
    unit: str = ""
    description: str = ""

@dataclass
class AlertRule:
    """告警规则数据类"""
    rule_id: str
    name: str
    metric_name: str
    condition: str  # 例如: "> 80", "< 10", "== 0"
    threshold: float
    level: AlertLevel
    duration: int = 60  # 持续时间(秒)
    enabled: bool = True
    description: str = ""
    notification_channels: List[str] = field(default_factory=list)

@dataclass
class Alert:
    """告警数据类"""
    alert_id: str
    rule_id: str
    metric_name: str
    current_value: float
    threshold: float
    level: AlertLevel
    status: AlertStatus
    start_time: datetime
    end_time: Optional[datetime] = None
    message: str = ""
    labels: Dict[str, str] = field(default_factory=dict)
    acknowledged: bool = False
    acknowledged_by: Optional[str] = None
    acknowledged_time: Optional[datetime] = None

class HBaseMonitor:
    """HBase监控器"""
    
    def __init__(self, cluster_manager: HBaseClusterManager):
        self.cluster_manager = cluster_manager
        self.metrics: Dict[str, List[Metric]] = {}
        self.alert_rules: Dict[str, AlertRule] = {}
        self.active_alerts: Dict[str, Alert] = {}
        self.alert_history: List[Alert] = []
        self.monitoring_enabled = False
        self.monitoring_thread = None
        self._lock = threading.Lock()
        self._initialize_default_rules()
    
    def _initialize_default_rules(self):
        """初始化默认告警规则"""
        default_rules = [
            AlertRule(
                rule_id="cpu_high",
                name="CPU使用率过高",
                metric_name="cpu_usage_percent",
                condition=">",
                threshold=80.0,
                level=AlertLevel.WARNING,
                duration=300,
                description="节点CPU使用率超过80%"
            ),
            AlertRule(
                rule_id="memory_high",
                name="内存使用率过高",
                metric_name="memory_usage_percent",
                condition=">",
                threshold=85.0,
                level=AlertLevel.ERROR,
                duration=180,
                description="节点内存使用率超过85%"
            ),
            AlertRule(
                rule_id="disk_high",
                name="磁盘使用率过高",
                metric_name="disk_usage_percent",
                condition=">",
                threshold=90.0,
                level=AlertLevel.CRITICAL,
                duration=60,
                description="节点磁盘使用率超过90%"
            ),
            AlertRule(
                rule_id="node_down",
                name="节点宕机",
                metric_name="node_status",
                condition="==",
                threshold=0.0,  # 0表示停止状态
                level=AlertLevel.CRITICAL,
                duration=30,
                description="节点状态异常或宕机"
            ),
            AlertRule(
                rule_id="region_count_high",
                name="Region数量过多",
                metric_name="region_count",
                condition=">",
                threshold=1000.0,
                level=AlertLevel.WARNING,
                duration=600,
                description="RegionServer上的Region数量过多"
            )
        ]
        
        for rule in default_rules:
            self.alert_rules[rule.rule_id] = rule
    
    def start_monitoring(self, interval: int = 30):
        """启动监控"""
        if not self.monitoring_enabled:
            self.monitoring_enabled = True
            self.monitoring_thread = threading.Thread(
                target=self._monitoring_loop, 
                args=(interval,)
            )
            self.monitoring_thread.daemon = True
            self.monitoring_thread.start()
            print(f"监控已启动,采集间隔: {interval}秒")
    
    def stop_monitoring(self):
        """停止监控"""
        self.monitoring_enabled = False
        if self.monitoring_thread:
            self.monitoring_thread.join(timeout=5)
        print("监控已停止")
    
    def _monitoring_loop(self, interval: int):
        """监控循环"""
        while self.monitoring_enabled:
            try:
                self._collect_metrics()
                self._evaluate_alerts()
                time.sleep(interval)
            except Exception as e:
                print(f"监控循环错误: {e}")
                time.sleep(interval)
    
    def _collect_metrics(self):
        """收集指标"""
        timestamp = datetime.now()
        
        for node_id, node in self.cluster_manager.nodes.items():
            labels = {
                "node_id": node_id,
                "hostname": node.hostname,
                "node_type": node.node_type.value
            }
            
            # 收集基础指标
            metrics = [
                Metric("cpu_usage_percent", node.cpu_usage, MetricType.GAUGE, timestamp, labels, "%"),
                Metric("memory_usage_percent", node.memory_usage, MetricType.GAUGE, timestamp, labels, "%"),
                Metric("disk_usage_percent", node.disk_usage, MetricType.GAUGE, timestamp, labels, "%"),
                Metric("node_status", 1.0 if node.status == NodeStatus.RUNNING else 0.0, 
                      MetricType.GAUGE, timestamp, labels),
            ]
            
            # 收集网络指标
            if node.network_io:
                metrics.extend([
                    Metric("network_bytes_in", node.network_io.get("bytes_in", 0), 
                          MetricType.COUNTER, timestamp, labels, "bytes"),
                    Metric("network_bytes_out", node.network_io.get("bytes_out", 0), 
                          MetricType.COUNTER, timestamp, labels, "bytes")
                ])
            
            # 模拟HBase特定指标
            if node.node_type == NodeType.REGION_SERVER:
                metrics.extend([
                    Metric("region_count", random.randint(50, 1200), MetricType.GAUGE, timestamp, labels),
                    Metric("request_count", random.randint(100, 10000), MetricType.COUNTER, timestamp, labels),
                    Metric("read_request_count", random.randint(50, 5000), MetricType.COUNTER, timestamp, labels),
                    Metric("write_request_count", random.randint(20, 2000), MetricType.COUNTER, timestamp, labels),
                    Metric("memstore_size_mb", random.uniform(10, 500), MetricType.GAUGE, timestamp, labels, "MB"),
                    Metric("storefile_count", random.randint(10, 1000), MetricType.GAUGE, timestamp, labels)
                ])
            
            elif node.node_type == NodeType.MASTER:
                metrics.extend([
                    Metric("table_count", random.randint(5, 100), MetricType.GAUGE, timestamp, labels),
                    Metric("region_in_transition", random.randint(0, 10), MetricType.GAUGE, timestamp, labels),
                    Metric("dead_region_servers", 0, MetricType.GAUGE, timestamp, labels)
                ])
            
            # 存储指标
            for metric in metrics:
                self._store_metric(metric)
    
    def _store_metric(self, metric: Metric):
        """存储指标"""
        with self._lock:
            metric_key = f"{metric.name}_{metric.labels.get('node_id', 'unknown')}"
            
            if metric_key not in self.metrics:
                self.metrics[metric_key] = []
            
            self.metrics[metric_key].append(metric)
            
            # 限制指标历史数量(保留最近1000个点)
            if len(self.metrics[metric_key]) > 1000:
                self.metrics[metric_key] = self.metrics[metric_key][-1000:]
    
    def _evaluate_alerts(self):
        """评估告警"""
        current_time = datetime.now()
        
        for rule_id, rule in self.alert_rules.items():
            if not rule.enabled:
                continue
            
            # 获取相关指标
            matching_metrics = self._get_metrics_for_rule(rule)
            
            for metric_key, metrics in matching_metrics.items():
                if not metrics:
                    continue
                
                latest_metric = metrics[-1]
                
                # 检查是否触发告警条件
                if self._check_alert_condition(latest_metric.value, rule):
                    self._handle_alert_trigger(rule, latest_metric, current_time)
                else:
                    self._handle_alert_resolve(rule, latest_metric, current_time)
    
    def _get_metrics_for_rule(self, rule: AlertRule) -> Dict[str, List[Metric]]:
        """获取规则相关的指标"""
        matching_metrics = {}
        
        for metric_key, metrics in self.metrics.items():
            if metrics and metrics[-1].name == rule.metric_name:
                matching_metrics[metric_key] = metrics
        
        return matching_metrics
    
    def _check_alert_condition(self, value: float, rule: AlertRule) -> bool:
        """检查告警条件"""
        if rule.condition == ">":
            return value > rule.threshold
        elif rule.condition == "<":
            return value < rule.threshold
        elif rule.condition == ">=":
            return value >= rule.threshold
        elif rule.condition == "<=":
            return value <= rule.threshold
        elif rule.condition == "==":
            return abs(value - rule.threshold) < 0.001
        elif rule.condition == "!=":
            return abs(value - rule.threshold) >= 0.001
        
        return False
    
    def _handle_alert_trigger(self, rule: AlertRule, metric: Metric, current_time: datetime):
        """处理告警触发"""
        alert_key = f"{rule.rule_id}_{metric.labels.get('node_id', 'unknown')}"
        
        if alert_key in self.active_alerts:
            # 更新现有告警
            alert = self.active_alerts[alert_key]
            alert.current_value = metric.value
        else:
            # 创建新告警
            alert = Alert(
                alert_id=str(uuid.uuid4()),
                rule_id=rule.rule_id,
                metric_name=rule.metric_name,
                current_value=metric.value,
                threshold=rule.threshold,
                level=rule.level,
                status=AlertStatus.ACTIVE,
                start_time=current_time,
                message=f"{rule.name}: {metric.name}={metric.value}{metric.unit} {rule.condition} {rule.threshold}{metric.unit}",
                labels=metric.labels.copy()
            )
            
            self.active_alerts[alert_key] = alert
            print(f"🚨 告警触发: {alert.message}")
    
    def _handle_alert_resolve(self, rule: AlertRule, metric: Metric, current_time: datetime):
        """处理告警解除"""
        alert_key = f"{rule.rule_id}_{metric.labels.get('node_id', 'unknown')}"
        
        if alert_key in self.active_alerts:
            alert = self.active_alerts[alert_key]
            alert.status = AlertStatus.RESOLVED
            alert.end_time = current_time
            
            # 移动到历史记录
            self.alert_history.append(alert)
            del self.active_alerts[alert_key]
            
            print(f"✅ 告警解除: {alert.message}")
    
    def add_alert_rule(self, rule: AlertRule) -> bool:
        """添加告警规则"""
        self.alert_rules[rule.rule_id] = rule
        print(f"告警规则已添加: {rule.name}")
        return True
    
    def remove_alert_rule(self, rule_id: str) -> bool:
        """移除告警规则"""
        if rule_id in self.alert_rules:
            del self.alert_rules[rule_id]
            print(f"告警规则已移除: {rule_id}")
            return True
        return False
    
    def acknowledge_alert(self, alert_id: str, acknowledged_by: str) -> bool:
        """确认告警"""
        for alert in self.active_alerts.values():
            if alert.alert_id == alert_id:
                alert.acknowledged = True
                alert.acknowledged_by = acknowledged_by
                alert.acknowledged_time = datetime.now()
                print(f"告警已确认: {alert.message} (by {acknowledged_by})")
                return True
        return False
    
    def get_active_alerts(self, level: AlertLevel = None) -> List[Alert]:
        """获取活跃告警"""
        alerts = list(self.active_alerts.values())
        
        if level:
            alerts = [alert for alert in alerts if alert.level == level]
        
        return sorted(alerts, key=lambda x: x.start_time, reverse=True)
    
    def get_alert_history(self, limit: int = 100) -> List[Alert]:
        """获取告警历史"""
        return sorted(self.alert_history[-limit:], key=lambda x: x.start_time, reverse=True)
    
    def get_metrics(self, metric_name: str, node_id: str = None, 
                   start_time: datetime = None, end_time: datetime = None) -> List[Metric]:
        """获取指标数据"""
        results = []
        
        for metric_key, metrics in self.metrics.items():
            # 过滤指标名称
            if not any(m.name == metric_name for m in metrics):
                continue
            
            # 过滤节点ID
            if node_id and not metric_key.endswith(node_id):
                continue
            
            # 过滤时间范围
            filtered_metrics = metrics
            if start_time or end_time:
                filtered_metrics = [
                    m for m in metrics
                    if (not start_time or m.timestamp >= start_time) and
                       (not end_time or m.timestamp <= end_time)
                ]
            
            results.extend(filtered_metrics)
        
        return sorted(results, key=lambda x: x.timestamp)
    
    def get_monitoring_summary(self) -> Dict[str, Any]:
        """获取监控摘要"""
        total_metrics = sum(len(metrics) for metrics in self.metrics.values())
        
        alert_counts = {}
        for level in AlertLevel:
            alert_counts[level.value] = len([
                alert for alert in self.active_alerts.values() 
                if alert.level == level
            ])
        
        return {
            "monitoring_enabled": self.monitoring_enabled,
            "total_metrics_collected": total_metrics,
            "active_alert_rules": len([r for r in self.alert_rules.values() if r.enabled]),
            "total_alert_rules": len(self.alert_rules),
            "active_alerts": len(self.active_alerts),
            "alert_counts_by_level": alert_counts,
            "total_alert_history": len(self.alert_history)
        }

# 监控示例
print("\n=== HBase监控示例 ===")

# 创建监控器
monitor = HBaseMonitor(cluster_manager)

print("1. 启动监控:")
monitor.start_monitoring(interval=5)  # 5秒采集间隔

# 等待收集一些数据
time.sleep(6)

print("\n2. 监控摘要:")
summary = monitor.get_monitoring_summary()
for key, value in summary.items():
    print(f"  {key}: {value}")

print("\n3. 添加自定义告警规则:")
custom_rule = AlertRule(
    rule_id="custom_request_high",
    name="请求量过高",
    metric_name="request_count",
    condition=">",
    threshold=8000.0,
    level=AlertLevel.WARNING,
    duration=120,
    description="RegionServer请求量过高"
)
monitor.add_alert_rule(custom_rule)

# 等待更多数据和可能的告警
time.sleep(3)

print("\n4. 活跃告警:")
active_alerts = monitor.get_active_alerts()
if active_alerts:
    for alert in active_alerts[:3]:  # 显示前3个告警
        print(f"  [{alert.level.value.upper()}] {alert.message}")
        print(f"    开始时间: {alert.start_time}")
        print(f"    节点: {alert.labels.get('hostname', 'unknown')}")
else:
    print("  当前无活跃告警")

print("\n5. 获取CPU使用率指标:")
cpu_metrics = monitor.get_metrics("cpu_usage_percent", limit=5)
for metric in cpu_metrics[-3:]:  # 显示最近3个数据点
    print(f"  {metric.timestamp}: {metric.labels['hostname']} = {metric.value:.1f}%")

# 停止监控
 monitor.stop_monitoring()

3. 备份与恢复

3.1 数据备份

class BackupType(Enum):
    """备份类型枚举"""
    FULL = "full"
    INCREMENTAL = "incremental"
    SNAPSHOT = "snapshot"
    EXPORT = "export"

class BackupStatus(Enum):
    """备份状态枚举"""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

class RestoreStatus(Enum):
    """恢复状态枚举"""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class BackupJob:
    """备份任务数据类"""
    job_id: str
    backup_type: BackupType
    source_tables: List[str]
    destination_path: str
    status: BackupStatus
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    progress_percent: float = 0.0
    total_size_bytes: int = 0
    backed_up_size_bytes: int = 0
    error_message: Optional[str] = None
    created_by: str = "system"
    retention_days: int = 30
    compression_enabled: bool = True
    encryption_enabled: bool = False

@dataclass
class RestoreJob:
    """恢复任务数据类"""
    job_id: str
    backup_job_id: str
    source_path: str
    target_tables: List[str]
    status: RestoreStatus
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    progress_percent: float = 0.0
    total_size_bytes: int = 0
    restored_size_bytes: int = 0
    error_message: Optional[str] = None
    created_by: str = "system"
    overwrite_existing: bool = False

class HBaseBackupManager:
    """HBase备份管理器"""
    
    def __init__(self, cluster_manager: HBaseClusterManager):
        self.cluster_manager = cluster_manager
        self.backup_jobs: Dict[str, BackupJob] = {}
        self.restore_jobs: Dict[str, RestoreJob] = {}
        self.snapshots: Dict[str, Dict[str, Any]] = {}
        self.backup_history: List[BackupJob] = []
        self.restore_history: List[RestoreJob] = []
        self._lock = threading.Lock()
    
    def create_backup(self, tables: List[str], destination_path: str, 
                     backup_type: BackupType = BackupType.FULL,
                     created_by: str = "admin") -> str:
        """创建备份任务"""
        job_id = f"backup_{int(time.time())}_{random.randint(1000, 9999)}"
        
        backup_job = BackupJob(
            job_id=job_id,
            backup_type=backup_type,
            source_tables=tables,
            destination_path=destination_path,
            status=BackupStatus.PENDING,
            created_by=created_by
        )
        
        with self._lock:
            self.backup_jobs[job_id] = backup_job
        
        # 启动备份线程
        backup_thread = threading.Thread(target=self._execute_backup, args=(job_id,))
        backup_thread.daemon = True
        backup_thread.start()
        
        print(f"备份任务已创建: {job_id}")
        return job_id
    
    def _execute_backup(self, job_id: str):
        """执行备份"""
        backup_job = self.backup_jobs[job_id]
        
        try:
            backup_job.status = BackupStatus.RUNNING
            backup_job.start_time = datetime.now()
            
            print(f"开始执行备份: {job_id}")
            
            # 模拟备份过程
            total_tables = len(backup_job.source_tables)
            
            for i, table in enumerate(backup_job.source_tables):
                # 模拟表备份
                print(f"  备份表: {table}")
                
                # 模拟备份进度
                for progress in range(0, 101, 20):
                    backup_job.progress_percent = ((i * 100) + progress) / total_tables
                    backup_job.backed_up_size_bytes = int(backup_job.total_size_bytes * backup_job.progress_percent / 100)
                    time.sleep(0.1)  # 模拟备份时间
                
                print(f"    表 {table} 备份完成")
            
            # 模拟最终大小
            backup_job.total_size_bytes = random.randint(1000000, 10000000)  # 1MB-10MB
            backup_job.backed_up_size_bytes = backup_job.total_size_bytes
            backup_job.progress_percent = 100.0
            backup_job.status = BackupStatus.COMPLETED
            backup_job.end_time = datetime.now()
            
            print(f"备份任务完成: {job_id}")
            
        except Exception as e:
            backup_job.status = BackupStatus.FAILED
            backup_job.error_message = str(e)
            backup_job.end_time = datetime.now()
            print(f"备份任务失败: {job_id} - {e}")
        
        finally:
            # 移动到历史记录
            with self._lock:
                self.backup_history.append(backup_job)
                if job_id in self.backup_jobs:
                    del self.backup_jobs[job_id]
    
    def create_snapshot(self, table_name: str, snapshot_name: str = None) -> str:
        """创建快照"""
        if not snapshot_name:
            snapshot_name = f"{table_name}_snapshot_{int(time.time())}"
        
        snapshot_info = {
            "snapshot_name": snapshot_name,
            "table_name": table_name,
            "creation_time": datetime.now(),
            "size_bytes": random.randint(100000, 5000000),
            "status": "active"
        }
        
        with self._lock:
            self.snapshots[snapshot_name] = snapshot_info
        
        print(f"快照已创建: {snapshot_name} (表: {table_name})")
        return snapshot_name
    
    def delete_snapshot(self, snapshot_name: str) -> bool:
        """删除快照"""
        with self._lock:
            if snapshot_name in self.snapshots:
                del self.snapshots[snapshot_name]
                print(f"快照已删除: {snapshot_name}")
                return True
        return False
    
    def restore_from_backup(self, backup_job_id: str, target_tables: List[str] = None,
                           overwrite_existing: bool = False, created_by: str = "admin") -> str:
        """从备份恢复"""
        # 查找备份任务
        backup_job = None
        for job in self.backup_history:
            if job.job_id == backup_job_id:
                backup_job = job
                break
        
        if not backup_job or backup_job.status != BackupStatus.COMPLETED:
            raise ValueError(f"备份任务不存在或未完成: {backup_job_id}")
        
        job_id = f"restore_{int(time.time())}_{random.randint(1000, 9999)}"
        
        restore_job = RestoreJob(
            job_id=job_id,
            backup_job_id=backup_job_id,
            source_path=backup_job.destination_path,
            target_tables=target_tables or backup_job.source_tables,
            status=RestoreStatus.PENDING,
            created_by=created_by,
            overwrite_existing=overwrite_existing,
            total_size_bytes=backup_job.total_size_bytes
        )
        
        with self._lock:
            self.restore_jobs[job_id] = restore_job
        
        # 启动恢复线程
        restore_thread = threading.Thread(target=self._execute_restore, args=(job_id,))
        restore_thread.daemon = True
        restore_thread.start()
        
        print(f"恢复任务已创建: {job_id}")
        return job_id
    
    def _execute_restore(self, job_id: str):
        """执行恢复"""
        restore_job = self.restore_jobs[job_id]
        
        try:
            restore_job.status = RestoreStatus.RUNNING
            restore_job.start_time = datetime.now()
            
            print(f"开始执行恢复: {job_id}")
            
            # 模拟恢复过程
            total_tables = len(restore_job.target_tables)
            
            for i, table in enumerate(restore_job.target_tables):
                print(f"  恢复表: {table}")
                
                # 模拟恢复进度
                for progress in range(0, 101, 25):
                    restore_job.progress_percent = ((i * 100) + progress) / total_tables
                    restore_job.restored_size_bytes = int(restore_job.total_size_bytes * restore_job.progress_percent / 100)
                    time.sleep(0.1)  # 模拟恢复时间
                
                print(f"    表 {table} 恢复完成")
            
            restore_job.restored_size_bytes = restore_job.total_size_bytes
            restore_job.progress_percent = 100.0
            restore_job.status = RestoreStatus.COMPLETED
            restore_job.end_time = datetime.now()
            
            print(f"恢复任务完成: {job_id}")
            
        except Exception as e:
            restore_job.status = RestoreStatus.FAILED
            restore_job.error_message = str(e)
            restore_job.end_time = datetime.now()
            print(f"恢复任务失败: {job_id} - {e}")
        
        finally:
            # 移动到历史记录
            with self._lock:
                self.restore_history.append(restore_job)
                if job_id in self.restore_jobs:
                    del self.restore_jobs[job_id]
    
    def cancel_backup(self, job_id: str) -> bool:
        """取消备份任务"""
        with self._lock:
            if job_id in self.backup_jobs:
                backup_job = self.backup_jobs[job_id]
                if backup_job.status in [BackupStatus.PENDING, BackupStatus.RUNNING]:
                    backup_job.status = BackupStatus.CANCELLED
                    backup_job.end_time = datetime.now()
                    print(f"备份任务已取消: {job_id}")
                    return True
        return False
    
    def cancel_restore(self, job_id: str) -> bool:
        """取消恢复任务"""
        with self._lock:
            if job_id in self.restore_jobs:
                restore_job = self.restore_jobs[job_id]
                if restore_job.status in [RestoreStatus.PENDING, RestoreStatus.RUNNING]:
                    restore_job.status = RestoreStatus.CANCELLED
                    restore_job.end_time = datetime.now()
                    print(f"恢复任务已取消: {job_id}")
                    return True
        return False
    
    def get_backup_status(self, job_id: str) -> Optional[Dict[str, Any]]:
        """获取备份状态"""
        # 检查活跃任务
        if job_id in self.backup_jobs:
            job = self.backup_jobs[job_id]
        else:
            # 检查历史记录
            job = None
            for backup_job in self.backup_history:
                if backup_job.job_id == job_id:
                    job = backup_job
                    break
        
        if not job:
            return None
        
        duration = None
        if job.start_time:
            end_time = job.end_time or datetime.now()
            duration = (end_time - job.start_time).total_seconds()
        
        return {
            "job_id": job.job_id,
            "backup_type": job.backup_type.value,
            "source_tables": job.source_tables,
            "destination_path": job.destination_path,
            "status": job.status.value,
            "progress_percent": job.progress_percent,
            "total_size_bytes": job.total_size_bytes,
            "backed_up_size_bytes": job.backed_up_size_bytes,
            "duration_seconds": duration,
            "error_message": job.error_message,
            "created_by": job.created_by
        }
    
    def get_restore_status(self, job_id: str) -> Optional[Dict[str, Any]]:
        """获取恢复状态"""
        # 检查活跃任务
        if job_id in self.restore_jobs:
            job = self.restore_jobs[job_id]
        else:
            # 检查历史记录
            job = None
            for restore_job in self.restore_history:
                if restore_job.job_id == job_id:
                    job = restore_job
                    break
        
        if not job:
            return None
        
        duration = None
        if job.start_time:
            end_time = job.end_time or datetime.now()
            duration = (end_time - job.start_time).total_seconds()
        
        return {
            "job_id": job.job_id,
            "backup_job_id": job.backup_job_id,
            "source_path": job.source_path,
            "target_tables": job.target_tables,
            "status": job.status.value,
            "progress_percent": job.progress_percent,
            "total_size_bytes": job.total_size_bytes,
            "restored_size_bytes": job.restored_size_bytes,
            "duration_seconds": duration,
            "error_message": job.error_message,
            "created_by": job.created_by,
            "overwrite_existing": job.overwrite_existing
        }
    
    def list_snapshots(self) -> List[Dict[str, Any]]:
        """列出所有快照"""
        snapshots = []
        for snapshot_name, info in self.snapshots.items():
            snapshots.append({
                "snapshot_name": snapshot_name,
                "table_name": info["table_name"],
                "creation_time": info["creation_time"].isoformat(),
                "size_bytes": info["size_bytes"],
                "status": info["status"]
            })
        
        return sorted(snapshots, key=lambda x: x["creation_time"], reverse=True)
    
    def get_backup_history(self, limit: int = 50) -> List[Dict[str, Any]]:
        """获取备份历史"""
        history = []
        for job in self.backup_history[-limit:]:
            duration = None
            if job.start_time and job.end_time:
                duration = (job.end_time - job.start_time).total_seconds()
            
            history.append({
                "job_id": job.job_id,
                "backup_type": job.backup_type.value,
                "source_tables": job.source_tables,
                "status": job.status.value,
                "start_time": job.start_time.isoformat() if job.start_time else None,
                "end_time": job.end_time.isoformat() if job.end_time else None,
                "duration_seconds": duration,
                "total_size_bytes": job.total_size_bytes,
                "created_by": job.created_by
            })
        
        return sorted(history, key=lambda x: x["start_time"] or "", reverse=True)
    
    def cleanup_old_backups(self, retention_days: int = 30) -> int:
        """清理过期备份"""
        cutoff_date = datetime.now() - timedelta(days=retention_days)
        cleaned_count = 0
        
        # 清理备份历史
        self.backup_history = [
            job for job in self.backup_history
            if not job.end_time or job.end_time > cutoff_date
        ]
        
        # 清理快照
        expired_snapshots = [
            name for name, info in self.snapshots.items()
            if info["creation_time"] < cutoff_date
        ]
        
        for snapshot_name in expired_snapshots:
            self.delete_snapshot(snapshot_name)
            cleaned_count += 1
        
        print(f"清理了 {cleaned_count} 个过期备份/快照")
        return cleaned_count

# 备份恢复示例
print("\n=== HBase备份恢复示例 ===")

# 创建备份管理器
backup_manager = HBaseBackupManager(cluster_manager)

print("1. 创建表快照:")
snapshot1 = backup_manager.create_snapshot("user_table")
snapshot2 = backup_manager.create_snapshot("order_table", "order_backup_20240101")

print("\n2. 创建全量备份:")
backup_job_id = backup_manager.create_backup(
    tables=["user_table", "order_table", "product_table"],
    destination_path="/backup/hbase/full_backup_20240101",
    backup_type=BackupType.FULL,
    created_by="admin"
)

# 等待备份完成
time.sleep(2)

print("\n3. 检查备份状态:")
backup_status = backup_manager.get_backup_status(backup_job_id)
if backup_status:
    print(f"  任务ID: {backup_status['job_id']}")
    print(f"  状态: {backup_status['status']}")
    print(f"  进度: {backup_status['progress_percent']:.1f}%")
    print(f"  大小: {backup_status['backed_up_size_bytes']} / {backup_status['total_size_bytes']} 字节")

print("\n4. 列出快照:")
snapshots = backup_manager.list_snapshots()
for snapshot in snapshots:
    print(f"  {snapshot['snapshot_name']}: {snapshot['table_name']} ({snapshot['size_bytes']} 字节)")

print("\n5. 创建恢复任务:")
restore_job_id = backup_manager.restore_from_backup(
    backup_job_id=backup_job_id,
    target_tables=["user_table_restored"],
    overwrite_existing=False,
    created_by="admin"
)

# 等待恢复完成
time.sleep(1.5)

print("\n6. 检查恢复状态:")
restore_status = backup_manager.get_restore_status(restore_job_id)
if restore_status:
    print(f"  任务ID: {restore_status['job_id']}")
    print(f"  状态: {restore_status['status']}")
    print(f"  进度: {restore_status['progress_percent']:.1f}%")
    print(f"  源备份: {restore_status['backup_job_id']}")

print("\n7. 备份历史:")
history = backup_manager.get_backup_history(5)
for record in history:
    print(f"  {record['job_id']}: {record['status']} - {len(record['source_tables'])} 表")

print("\n8. 清理过期备份:")
cleaned = backup_manager.cleanup_old_backups(retention_days=7)
print(f"  清理了 {cleaned} 个过期项目")

4. 故障排除

4.1 故障诊断

class IssueType(Enum):
    """问题类型枚举"""
    PERFORMANCE = "performance"
    CONNECTIVITY = "connectivity"
    DATA_CORRUPTION = "data_corruption"
    RESOURCE_EXHAUSTION = "resource_exhaustion"
    CONFIGURATION = "configuration"
    HARDWARE = "hardware"
    NETWORK = "network"

class IssueSeverity(Enum):
    """问题严重程度枚举"""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class IssueStatus(Enum):
    """问题状态枚举"""
    DETECTED = "detected"
    INVESTIGATING = "investigating"
    RESOLVING = "resolving"
    RESOLVED = "resolved"
    CLOSED = "closed"

@dataclass
class DiagnosticResult:
    """诊断结果数据类"""
    check_name: str
    status: str  # "pass", "warning", "error"
    message: str
    details: Dict[str, Any] = field(default_factory=dict)
    recommendations: List[str] = field(default_factory=list)
    timestamp: datetime = field(default_factory=datetime.now)

@dataclass
class Issue:
    """问题数据类"""
    issue_id: str
    title: str
    description: str
    issue_type: IssueType
    severity: IssueSeverity
    status: IssueStatus
    affected_nodes: List[str]
    detected_time: datetime
    resolved_time: Optional[datetime] = None
    assigned_to: Optional[str] = None
    resolution_steps: List[str] = field(default_factory=list)
    diagnostic_results: List[DiagnosticResult] = field(default_factory=list)

class HBaseTroubleshooter:
    """HBase故障排除器"""
    
    def __init__(self, cluster_manager: HBaseClusterManager, monitor: HBaseMonitor):
        self.cluster_manager = cluster_manager
        self.monitor = monitor
        self.issues: Dict[str, Issue] = {}
        self.issue_history: List[Issue] = []
        self.diagnostic_checks = self._initialize_diagnostic_checks()
    
    def _initialize_diagnostic_checks(self) -> Dict[str, callable]:
        """初始化诊断检查"""
        return {
            "cluster_health": self._check_cluster_health,
            "node_connectivity": self._check_node_connectivity,
            "resource_usage": self._check_resource_usage,
            "hbase_services": self._check_hbase_services,
            "zookeeper_health": self._check_zookeeper_health,
            "hdfs_health": self._check_hdfs_health,
            "region_distribution": self._check_region_distribution,
            "compaction_status": self._check_compaction_status,
            "memory_usage": self._check_memory_usage,
            "disk_space": self._check_disk_space
        }
    
    def run_full_diagnostic(self) -> List[DiagnosticResult]:
        """运行完整诊断"""
        print("开始运行完整系统诊断...")
        results = []
        
        for check_name, check_func in self.diagnostic_checks.items():
            try:
                print(f"  执行检查: {check_name}")
                result = check_func()
                results.append(result)
            except Exception as e:
                error_result = DiagnosticResult(
                    check_name=check_name,
                    status="error",
                    message=f"检查执行失败: {str(e)}"
                )
                results.append(error_result)
        
        # 自动检测问题
        self._auto_detect_issues(results)
        
        print("诊断完成")
        return results
    
    def _check_cluster_health(self) -> DiagnosticResult:
        """检查集群健康状态"""
        cluster_status = self.cluster_manager.get_cluster_status()
        
        total_nodes = cluster_status["total_nodes"]
        running_nodes = cluster_status["running_nodes"]
        
        if running_nodes == total_nodes:
            status = "pass"
            message = f"集群健康:所有 {total_nodes} 个节点正常运行"
        elif running_nodes >= total_nodes * 0.8:
            status = "warning"
            message = f"集群部分异常:{running_nodes}/{total_nodes} 节点运行"
        else:
            status = "error"
            message = f"集群严重异常:仅 {running_nodes}/{total_nodes} 节点运行"
        
        return DiagnosticResult(
            check_name="cluster_health",
            status=status,
            message=message,
            details=cluster_status
        )
    
    def _check_node_connectivity(self) -> DiagnosticResult:
        """检查节点连通性"""
        connectivity_issues = []
        
        for node_id, node in self.cluster_manager.nodes.items():
            if node.status != NodeStatus.RUNNING:
                connectivity_issues.append(f"{node.hostname} ({node_id})")
            elif node.last_heartbeat:
                time_since_heartbeat = (datetime.now() - node.last_heartbeat).total_seconds()
                if time_since_heartbeat > 300:  # 5分钟无心跳
                    connectivity_issues.append(f"{node.hostname} (心跳超时)")
        
        if not connectivity_issues:
            status = "pass"
            message = "所有节点连通性正常"
        else:
            status = "error"
            message = f"发现 {len(connectivity_issues)} 个连通性问题"
        
        return DiagnosticResult(
            check_name="node_connectivity",
            status=status,
            message=message,
            details={"problematic_nodes": connectivity_issues}
        )
    
    def _check_resource_usage(self) -> DiagnosticResult:
        """检查资源使用情况"""
        high_usage_nodes = []
        
        for node_id, node in self.cluster_manager.nodes.items():
            if node.status == NodeStatus.RUNNING:
                if node.cpu_usage > 90 or node.memory_usage > 90 or node.disk_usage > 90:
                    high_usage_nodes.append({
                        "node_id": node_id,
                        "hostname": node.hostname,
                        "cpu": node.cpu_usage,
                        "memory": node.memory_usage,
                        "disk": node.disk_usage
                    })
        
        if not high_usage_nodes:
            status = "pass"
            message = "所有节点资源使用正常"
        elif len(high_usage_nodes) <= 2:
            status = "warning"
            message = f"{len(high_usage_nodes)} 个节点资源使用率较高"
        else:
            status = "error"
            message = f"{len(high_usage_nodes)} 个节点资源使用率过高"
        
        recommendations = []
        if high_usage_nodes:
            recommendations.extend([
                "检查是否有异常进程占用资源",
                "考虑增加节点或升级硬件配置",
                "优化HBase配置参数"
            ])
        
        return DiagnosticResult(
            check_name="resource_usage",
            status=status,
            message=message,
            details={"high_usage_nodes": high_usage_nodes},
            recommendations=recommendations
        )
    
    def _check_hbase_services(self) -> DiagnosticResult:
        """检查HBase服务状态"""
        master_nodes = [node for node in self.cluster_manager.nodes.values() 
                      if node.node_type == NodeType.MASTER]
        regionserver_nodes = [node for node in self.cluster_manager.nodes.values() 
                            if node.node_type == NodeType.REGION_SERVER]
        
        running_masters = [node for node in master_nodes if node.status == NodeStatus.RUNNING]
        running_regionservers = [node for node in regionserver_nodes if node.status == NodeStatus.RUNNING]
        
        issues = []
        if not running_masters:
            issues.append("没有运行的HMaster节点")
        elif len(running_masters) > 1:
            issues.append(f"检测到多个活跃HMaster: {len(running_masters)}")
        
        if len(running_regionservers) < len(regionserver_nodes) * 0.8:
            issues.append(f"RegionServer节点不足: {len(running_regionservers)}/{len(regionserver_nodes)}")
        
        if not issues:
            status = "pass"
            message = "HBase服务状态正常"
        else:
            status = "error"
            message = f"HBase服务异常: {'; '.join(issues)}"
        
        return DiagnosticResult(
            check_name="hbase_services",
            status=status,
            message=message,
            details={
                "master_count": len(running_masters),
                "regionserver_count": len(running_regionservers),
                "total_regionservers": len(regionserver_nodes)
            }
        )
    
    def _check_zookeeper_health(self) -> DiagnosticResult:
        """检查ZooKeeper健康状态"""
        zk_nodes = [node for node in self.cluster_manager.nodes.values() 
                   if node.node_type == NodeType.ZOOKEEPER]
        running_zk_nodes = [node for node in zk_nodes if node.status == NodeStatus.RUNNING]
        
        if len(running_zk_nodes) >= len(zk_nodes) // 2 + 1:  # 过半数运行
            status = "pass"
            message = f"ZooKeeper集群正常: {len(running_zk_nodes)}/{len(zk_nodes)} 节点运行"
        else:
            status = "error"
            message = f"ZooKeeper集群异常: 仅 {len(running_zk_nodes)}/{len(zk_nodes)} 节点运行"
        
        return DiagnosticResult(
            check_name="zookeeper_health",
            status=status,
            message=message,
            details={
                "running_nodes": len(running_zk_nodes),
                "total_nodes": len(zk_nodes),
                "quorum_size": len(zk_nodes) // 2 + 1
            }
        )
    
    def _check_hdfs_health(self) -> DiagnosticResult:
        """检查HDFS健康状态"""
        namenode_nodes = [node for node in self.cluster_manager.nodes.values() 
                         if node.node_type == NodeType.HDFS_NAMENODE]
        datanode_nodes = [node for node in self.cluster_manager.nodes.values() 
                         if node.node_type == NodeType.HDFS_DATANODE]
        
        running_namenodes = [node for node in namenode_nodes if node.status == NodeStatus.RUNNING]
        running_datanodes = [node for node in datanode_nodes if node.status == NodeStatus.RUNNING]
        
        issues = []
        if not running_namenodes:
            issues.append("没有运行的NameNode")
        
        if len(running_datanodes) < len(datanode_nodes) * 0.7:
            issues.append(f"DataNode节点不足: {len(running_datanodes)}/{len(datanode_nodes)}")
        
        if not issues:
            status = "pass"
            message = "HDFS状态正常"
        else:
            status = "error"
            message = f"HDFS异常: {'; '.join(issues)}"
        
        return DiagnosticResult(
            check_name="hdfs_health",
            status=status,
            message=message,
            details={
                "namenode_count": len(running_namenodes),
                "datanode_count": len(running_datanodes),
                "total_datanodes": len(datanode_nodes)
            }
        )
    
    def _check_region_distribution(self) -> DiagnosticResult:
        """检查Region分布"""
        regionserver_nodes = [node for node in self.cluster_manager.nodes.values() 
                            if node.node_type == NodeType.REGION_SERVER and node.status == NodeStatus.RUNNING]
        
        if not regionserver_nodes:
            return DiagnosticResult(
                check_name="region_distribution",
                status="error",
                message="没有运行的RegionServer"
            )
        
        # 模拟Region分布检查
        region_counts = [random.randint(50, 1200) for _ in regionserver_nodes]
        avg_regions = sum(region_counts) / len(region_counts)
        max_regions = max(region_counts)
        min_regions = min(region_counts)
        
        imbalance_ratio = (max_regions - min_regions) / avg_regions if avg_regions > 0 else 0
        
        if imbalance_ratio < 0.2:  # 20%以内的不平衡
            status = "pass"
            message = f"Region分布均衡 (不平衡率: {imbalance_ratio:.1%})"
        elif imbalance_ratio < 0.5:
            status = "warning"
            message = f"Region分布轻微不均衡 (不平衡率: {imbalance_ratio:.1%})"
        else:
            status = "error"
            message = f"Region分布严重不均衡 (不平衡率: {imbalance_ratio:.1%})"
        
        recommendations = []
        if imbalance_ratio > 0.2:
            recommendations.extend([
                "运行Region平衡器",
                "检查热点数据分布",
                "考虑调整表的预分区策略"
            ])
        
        return DiagnosticResult(
            check_name="region_distribution",
            status=status,
            message=message,
            details={
                "avg_regions": avg_regions,
                "max_regions": max_regions,
                "min_regions": min_regions,
                "imbalance_ratio": imbalance_ratio
            },
            recommendations=recommendations
        )
    
    def _check_compaction_status(self) -> DiagnosticResult:
        """检查压缩状态"""
        # 模拟压缩状态检查
        pending_compactions = random.randint(0, 50)
        running_compactions = random.randint(0, 10)
        
        if pending_compactions < 20 and running_compactions < 5:
            status = "pass"
            message = "压缩状态正常"
        elif pending_compactions < 50:
            status = "warning"
            message = f"压缩队列较长: {pending_compactions} 个待处理"
        else:
            status = "error"
            message = f"压缩队列过长: {pending_compactions} 个待处理"
        
        return DiagnosticResult(
            check_name="compaction_status",
            status=status,
            message=message,
            details={
                "pending_compactions": pending_compactions,
                "running_compactions": running_compactions
            }
        )
    
    def _check_memory_usage(self) -> DiagnosticResult:
        """检查内存使用"""
        high_memory_nodes = []
        
        for node_id, node in self.cluster_manager.nodes.items():
            if node.status == NodeStatus.RUNNING and node.memory_usage > 85:
                high_memory_nodes.append({
                    "node_id": node_id,
                    "hostname": node.hostname,
                    "memory_usage": node.memory_usage
                })
        
        if not high_memory_nodes:
            status = "pass"
            message = "内存使用正常"
        elif len(high_memory_nodes) <= 2:
            status = "warning"
            message = f"{len(high_memory_nodes)} 个节点内存使用率较高"
        else:
            status = "error"
            message = f"{len(high_memory_nodes)} 个节点内存使用率过高"
        
        return DiagnosticResult(
            check_name="memory_usage",
            status=status,
            message=message,
            details={"high_memory_nodes": high_memory_nodes}
        )
    
    def _check_disk_space(self) -> DiagnosticResult:
        """检查磁盘空间"""
        low_disk_nodes = []
        
        for node_id, node in self.cluster_manager.nodes.items():
            if node.status == NodeStatus.RUNNING and node.disk_usage > 80:
                low_disk_nodes.append({
                    "node_id": node_id,
                    "hostname": node.hostname,
                    "disk_usage": node.disk_usage
                })
        
        if not low_disk_nodes:
            status = "pass"
            message = "磁盘空间充足"
        elif len(low_disk_nodes) <= 1:
            status = "warning"
            message = f"{len(low_disk_nodes)} 个节点磁盘空间不足"
        else:
            status = "error"
            message = f"{len(low_disk_nodes)} 个节点磁盘空间严重不足"
        
        recommendations = []
        if low_disk_nodes:
            recommendations.extend([
                "清理日志文件",
                "运行压缩以减少存储空间",
                "考虑增加存储容量"
            ])
        
        return DiagnosticResult(
            check_name="disk_space",
            status=status,
            message=message,
            details={"low_disk_nodes": low_disk_nodes},
            recommendations=recommendations
        )
    
    def _auto_detect_issues(self, diagnostic_results: List[DiagnosticResult]):
        """自动检测问题"""
        for result in diagnostic_results:
            if result.status == "error":
                issue_id = f"issue_{int(time.time())}_{random.randint(1000, 9999)}"
                
                # 确定问题类型和严重程度
                issue_type = self._determine_issue_type(result.check_name)
                severity = self._determine_severity(result)
                
                issue = Issue(
                    issue_id=issue_id,
                    title=f"{result.check_name} 检查失败",
                    description=result.message,
                    issue_type=issue_type,
                    severity=severity,
                    status=IssueStatus.DETECTED,
                    affected_nodes=self._extract_affected_nodes(result),
                    detected_time=datetime.now(),
                    diagnostic_results=[result]
                )
                
                self.issues[issue_id] = issue
                print(f"🔍 检测到问题: {issue.title} (严重程度: {severity.value})")
    
    def _determine_issue_type(self, check_name: str) -> IssueType:
        """确定问题类型"""
        type_mapping = {
            "cluster_health": IssueType.CONNECTIVITY,
            "node_connectivity": IssueType.CONNECTIVITY,
            "resource_usage": IssueType.RESOURCE_EXHAUSTION,
            "hbase_services": IssueType.CONFIGURATION,
            "zookeeper_health": IssueType.CONNECTIVITY,
            "hdfs_health": IssueType.CONNECTIVITY,
            "region_distribution": IssueType.PERFORMANCE,
            "compaction_status": IssueType.PERFORMANCE,
            "memory_usage": IssueType.RESOURCE_EXHAUSTION,
            "disk_space": IssueType.RESOURCE_EXHAUSTION
        }
        return type_mapping.get(check_name, IssueType.CONFIGURATION)
    
    def _determine_severity(self, result: DiagnosticResult) -> IssueSeverity:
        """确定严重程度"""
        if "严重" in result.message or "critical" in result.message.lower():
            return IssueSeverity.CRITICAL
        elif "异常" in result.message or "error" in result.message.lower():
            return IssueSeverity.HIGH
        elif "警告" in result.message or "warning" in result.message.lower():
            return IssueSeverity.MEDIUM
        else:
            return IssueSeverity.LOW
    
    def _extract_affected_nodes(self, result: DiagnosticResult) -> List[str]:
        """提取受影响的节点"""
        affected_nodes = []
        
        if "details" in result.details:
            details = result.details["details"]
            if "problematic_nodes" in details:
                affected_nodes.extend(details["problematic_nodes"])
            elif "high_usage_nodes" in details:
                affected_nodes.extend([node["hostname"] for node in details["high_usage_nodes"]])
            elif "low_disk_nodes" in details:
                affected_nodes.extend([node["hostname"] for node in details["low_disk_nodes"]])
        
        return affected_nodes
    
    def create_issue(self, title: str, description: str, issue_type: IssueType,
                    severity: IssueSeverity, affected_nodes: List[str] = None) -> str:
        """手动创建问题"""
        issue_id = f"manual_{int(time.time())}_{random.randint(1000, 9999)}"
        
        issue = Issue(
            issue_id=issue_id,
            title=title,
            description=description,
            issue_type=issue_type,
            severity=severity,
            status=IssueStatus.DETECTED,
            affected_nodes=affected_nodes or [],
            detected_time=datetime.now()
        )
        
        self.issues[issue_id] = issue
        print(f"问题已创建: {issue_id}")
        return issue_id
    
    def update_issue_status(self, issue_id: str, status: IssueStatus, 
                           assigned_to: str = None) -> bool:
        """更新问题状态"""
        if issue_id in self.issues:
            issue = self.issues[issue_id]
            issue.status = status
            
            if assigned_to:
                issue.assigned_to = assigned_to
            
            if status == IssueStatus.RESOLVED:
                issue.resolved_time = datetime.now()
            
            print(f"问题状态已更新: {issue_id} -> {status.value}")
            return True
        return False
    
    def add_resolution_step(self, issue_id: str, step: str) -> bool:
        """添加解决步骤"""
        if issue_id in self.issues:
            self.issues[issue_id].resolution_steps.append(step)
            print(f"解决步骤已添加到问题 {issue_id}: {step}")
            return True
        return False
    
    def get_active_issues(self, severity: IssueSeverity = None) -> List[Issue]:
        """获取活跃问题"""
        active_issues = [
            issue for issue in self.issues.values()
            if issue.status not in [IssueStatus.RESOLVED, IssueStatus.CLOSED]
        ]
        
        if severity:
            active_issues = [issue for issue in active_issues if issue.severity == severity]
        
        return sorted(active_issues, key=lambda x: (x.severity.value, x.detected_time), reverse=True)
    
    def get_issue_summary(self) -> Dict[str, Any]:
        """获取问题摘要"""
        active_issues = self.get_active_issues()
        
        severity_counts = {}
        for severity in IssueSeverity:
            severity_counts[severity.value] = len([
                issue for issue in active_issues if issue.severity == severity
            ])
        
        type_counts = {}
        for issue_type in IssueType:
            type_counts[issue_type.value] = len([
                issue for issue in active_issues if issue.issue_type == issue_type
            ])
        
        return {
            "total_active_issues": len(active_issues),
            "severity_distribution": severity_counts,
            "type_distribution": type_counts,
            "total_issues_ever": len(self.issues) + len(self.issue_history)
        }

# 故障排除示例
print("\n=== HBase故障排除示例 ===")

# 创建故障排除器
troubleshooter = HBaseTroubleshooter(cluster_manager, monitor)

print("1. 运行完整诊断:")
diagnostic_results = troubleshooter.run_full_diagnostic()

print("\n2. 诊断结果摘要:")
for result in diagnostic_results:
    status_icon = {"pass": "✅", "warning": "⚠️", "error": "❌"}.get(result.status, "❓")
    print(f"  {status_icon} {result.check_name}: {result.message}")

print("\n3. 手动创建问题:")
issue_id = troubleshooter.create_issue(
    title="RegionServer响应缓慢",
    description="用户报告查询响应时间超过10秒",
    issue_type=IssueType.PERFORMANCE,
    severity=IssueSeverity.HIGH,
    affected_nodes=["regionserver1.example.com"]
)

print("\n4. 活跃问题列表:")
active_issues = troubleshooter.get_active_issues()
for issue in active_issues:
    print(f"  [{issue.severity.value.upper()}] {issue.title}")
    print(f"    ID: {issue.issue_id}")
    print(f"    类型: {issue.issue_type.value}")
    print(f"    状态: {issue.status.value}")
    print(f"    检测时间: {issue.detected_time}")
    if issue.affected_nodes:
        print(f"    受影响节点: {', '.join(issue.affected_nodes)}")
    print()

print("\n5. 添加解决步骤:")
troubleshooter.add_resolution_step(issue_id, "检查RegionServer日志")
troubleshooter.add_resolution_step(issue_id, "分析慢查询")
troubleshooter.add_resolution_step(issue_id, "优化表结构")

print("\n6. 更新问题状态:")
troubleshooter.update_issue_status(issue_id, IssueStatus.INVESTIGATING, "admin")
troubleshooter.update_issue_status(issue_id, IssueStatus.RESOLVED)

print("\n7. 问题摘要:")
summary = troubleshooter.get_issue_summary()
for key, value in summary.items():
    print(f"  {key}: {value}")

5. 运维最佳实践

5.1 自动化运维

class AutomationLevel(Enum):
    """自动化级别枚举"""
    MANUAL = "manual"
    SEMI_AUTOMATIC = "semi_automatic"
    FULLY_AUTOMATIC = "fully_automatic"

class TaskType(Enum):
    """任务类型枚举"""
    BACKUP = "backup"
    MONITORING = "monitoring"
    SCALING = "scaling"
    MAINTENANCE = "maintenance"
    RECOVERY = "recovery"
    OPTIMIZATION = "optimization"

class TaskStatus(Enum):
    """任务状态枚举"""
    SCHEDULED = "scheduled"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class AutomationTask:
    """自动化任务数据类"""
    task_id: str
    task_type: TaskType
    name: str
    description: str
    automation_level: AutomationLevel
    schedule_cron: Optional[str] = None
    status: TaskStatus = TaskStatus.SCHEDULED
    last_run: Optional[datetime] = None
    next_run: Optional[datetime] = None
    run_count: int = 0
    success_count: int = 0
    failure_count: int = 0
    average_duration: float = 0.0
    enabled: bool = True
    parameters: Dict[str, Any] = field(default_factory=dict)
    dependencies: List[str] = field(default_factory=list)

class HBaseAutomationManager:
    """HBase自动化管理器"""
    
    def __init__(self, cluster_manager: HBaseClusterManager, 
                 backup_manager: HBaseBackupManager,
                 monitor: HBaseMonitor,
                 troubleshooter: HBaseTroubleshooter):
        self.cluster_manager = cluster_manager
        self.backup_manager = backup_manager
        self.monitor = monitor
        self.troubleshooter = troubleshooter
        self.tasks: Dict[str, AutomationTask] = {}
        self.task_history: List[Dict[str, Any]] = []
        self.running_tasks: Set[str] = set()
        self._scheduler_running = False
        self._lock = threading.Lock()
    
    def create_task(self, name: str, task_type: TaskType, description: str,
                   automation_level: AutomationLevel = AutomationLevel.SEMI_AUTOMATIC,
                   schedule_cron: str = None, parameters: Dict[str, Any] = None) -> str:
        """创建自动化任务"""
        task_id = f"task_{int(time.time())}_{random.randint(1000, 9999)}"
        
        task = AutomationTask(
            task_id=task_id,
            task_type=task_type,
            name=name,
            description=description,
            automation_level=automation_level,
            schedule_cron=schedule_cron,
            parameters=parameters or {}
        )
        
        # 计算下次运行时间
        if schedule_cron:
            task.next_run = self._calculate_next_run(schedule_cron)
        
        with self._lock:
            self.tasks[task_id] = task
        
        print(f"自动化任务已创建: {task_id} - {name}")
        return task_id
    
    def _calculate_next_run(self, cron_expression: str) -> datetime:
        """计算下次运行时间(简化版cron解析)"""
        # 简化的cron解析,实际应用中应使用专业的cron库
        now = datetime.now()
        
        if cron_expression == "@daily":
            next_run = now.replace(hour=2, minute=0, second=0, microsecond=0)
            if next_run <= now:
                next_run += timedelta(days=1)
        elif cron_expression == "@hourly":
            next_run = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
        elif cron_expression == "@weekly":
            days_ahead = 6 - now.weekday()  # 周日
            next_run = (now + timedelta(days=days_ahead)).replace(hour=3, minute=0, second=0, microsecond=0)
            if next_run <= now:
                next_run += timedelta(weeks=1)
        else:
            # 默认每小时运行
            next_run = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)
        
        return next_run
    
    def start_scheduler(self):
        """启动任务调度器"""
        if self._scheduler_running:
            print("调度器已在运行")
            return
        
        self._scheduler_running = True
        scheduler_thread = threading.Thread(target=self._scheduler_loop)
        scheduler_thread.daemon = True
        scheduler_thread.start()
        print("任务调度器已启动")
    
    def stop_scheduler(self):
        """停止任务调度器"""
        self._scheduler_running = False
        print("任务调度器已停止")
    
    def _scheduler_loop(self):
        """调度器主循环"""
        while self._scheduler_running:
            try:
                current_time = datetime.now()
                
                for task_id, task in list(self.tasks.items()):
                    if (task.enabled and 
                        task.next_run and 
                        task.next_run <= current_time and 
                        task_id not in self.running_tasks):
                        
                        # 执行任务
                        self._execute_task(task_id)
                
                time.sleep(60)  # 每分钟检查一次
                
            except Exception as e:
                print(f"调度器错误: {e}")
                time.sleep(60)
    
    def _execute_task(self, task_id: str):
        """执行任务"""
        task = self.tasks.get(task_id)
        if not task:
            return
        
        with self._lock:
            self.running_tasks.add(task_id)
        
        # 启动任务执行线程
        task_thread = threading.Thread(target=self._run_task, args=(task_id,))
        task_thread.daemon = True
        task_thread.start()
    
    def _run_task(self, task_id: str):
        """运行任务"""
        task = self.tasks[task_id]
        start_time = datetime.now()
        
        try:
            task.status = TaskStatus.RUNNING
            task.last_run = start_time
            task.run_count += 1
            
            print(f"开始执行任务: {task.name} ({task_id})")
            
            # 根据任务类型执行相应操作
            if task.task_type == TaskType.BACKUP:
                self._execute_backup_task(task)
            elif task.task_type == TaskType.MONITORING:
                self._execute_monitoring_task(task)
            elif task.task_type == TaskType.SCALING:
                self._execute_scaling_task(task)
            elif task.task_type == TaskType.MAINTENANCE:
                self._execute_maintenance_task(task)
            elif task.task_type == TaskType.RECOVERY:
                self._execute_recovery_task(task)
            elif task.task_type == TaskType.OPTIMIZATION:
                self._execute_optimization_task(task)
            
            task.status = TaskStatus.COMPLETED
            task.success_count += 1
            
            # 更新平均执行时间
            duration = (datetime.now() - start_time).total_seconds()
            task.average_duration = ((task.average_duration * (task.success_count - 1)) + duration) / task.success_count
            
            print(f"任务执行完成: {task.name} (耗时: {duration:.1f}秒)")
            
        except Exception as e:
            task.status = TaskStatus.FAILED
            task.failure_count += 1
            print(f"任务执行失败: {task.name} - {e}")
        
        finally:
            # 计算下次运行时间
            if task.schedule_cron:
                task.next_run = self._calculate_next_run(task.schedule_cron)
            
            # 记录任务历史
            self.task_history.append({
                "task_id": task_id,
                "name": task.name,
                "task_type": task.task_type.value,
                "status": task.status.value,
                "start_time": start_time.isoformat(),
                "end_time": datetime.now().isoformat(),
                "duration": (datetime.now() - start_time).total_seconds()
            })
            
            with self._lock:
                self.running_tasks.discard(task_id)
    
    def _execute_backup_task(self, task: AutomationTask):
        """执行备份任务"""
        tables = task.parameters.get("tables", ["user_table", "order_table"])
        backup_type = BackupType(task.parameters.get("backup_type", "full"))
        destination = task.parameters.get("destination", f"/backup/auto_{int(time.time())}")
        
        backup_job_id = self.backup_manager.create_backup(
            tables=tables,
            destination_path=destination,
            backup_type=backup_type,
            created_by="automation"
        )
        
        # 等待备份完成
        while True:
            status = self.backup_manager.get_backup_status(backup_job_id)
            if status and status["status"] in ["completed", "failed", "cancelled"]:
                break
            time.sleep(5)
        
        if status["status"] != "completed":
            raise Exception(f"备份失败: {status.get('error_message', '未知错误')}")
    
    def _execute_monitoring_task(self, task: AutomationTask):
        """执行监控任务"""
        # 运行诊断检查
        results = self.troubleshooter.run_full_diagnostic()
        
        # 检查是否有严重问题
        critical_issues = [r for r in results if r.status == "error"]
        if critical_issues:
            print(f"发现 {len(critical_issues)} 个严重问题,需要关注")
        
        # 收集性能指标
        cluster_status = self.cluster_manager.get_cluster_status()
        print(f"集群状态: {cluster_status['running_nodes']}/{cluster_status['total_nodes']} 节点运行")
    
    def _execute_scaling_task(self, task: AutomationTask):
        """执行扩缩容任务"""
        target_nodes = task.parameters.get("target_nodes", 5)
        current_nodes = len([n for n in self.cluster_manager.nodes.values() 
                           if n.status == NodeStatus.RUNNING])
        
        if current_nodes < target_nodes:
            # 需要扩容
            for i in range(target_nodes - current_nodes):
                node_id = f"auto_node_{int(time.time())}_{i}"
                self.cluster_manager.add_node(
                    node_id=node_id,
                    hostname=f"auto-{i}.example.com",
                    node_type=NodeType.REGION_SERVER
                )
                self.cluster_manager.start_node(node_id)
                print(f"自动添加节点: {node_id}")
        elif current_nodes > target_nodes:
            # 需要缩容(简化处理)
            print(f"当前节点数({current_nodes})超过目标({target_nodes}),建议手动缩容")
    
    def _execute_maintenance_task(self, task: AutomationTask):
        """执行维护任务"""
        maintenance_type = task.parameters.get("type", "cleanup")
        
        if maintenance_type == "cleanup":
            # 清理过期备份
            retention_days = task.parameters.get("retention_days", 30)
            cleaned = self.backup_manager.cleanup_old_backups(retention_days)
            print(f"清理维护: 删除了 {cleaned} 个过期备份")
        
        elif maintenance_type == "compaction":
            # 模拟触发压缩
            print("触发主要压缩操作")
            time.sleep(2)  # 模拟压缩时间
        
        elif maintenance_type == "balance":
            # 模拟Region平衡
            print("触发Region平衡操作")
            time.sleep(3)  # 模拟平衡时间
    
    def _execute_recovery_task(self, task: AutomationTask):
        """执行恢复任务"""
        # 检查是否有失败的节点需要恢复
        failed_nodes = [node_id for node_id, node in self.cluster_manager.nodes.items() 
                       if node.status == NodeStatus.FAILED]
        
        for node_id in failed_nodes:
            try:
                self.cluster_manager.restart_node(node_id)
                print(f"自动恢复节点: {node_id}")
            except Exception as e:
                print(f"节点恢复失败 {node_id}: {e}")
    
    def _execute_optimization_task(self, task: AutomationTask):
        """执行优化任务"""
        optimization_type = task.parameters.get("type", "performance")
        
        if optimization_type == "performance":
            # 性能优化检查
            print("执行性能优化检查")
            
            # 检查Region分布
            results = self.troubleshooter.run_full_diagnostic()
            region_result = next((r for r in results if r.check_name == "region_distribution"), None)
            
            if region_result and region_result.status in ["warning", "error"]:
                print("检测到Region分布不均,建议运行平衡器")
        
        elif optimization_type == "storage":
            # 存储优化
            print("执行存储优化")
            # 模拟存储优化操作
            time.sleep(1)
    
    def run_task_now(self, task_id: str) -> bool:
        """立即运行任务"""
        if task_id not in self.tasks:
            return False
        
        if task_id in self.running_tasks:
            print(f"任务 {task_id} 正在运行中")
            return False
        
        self._execute_task(task_id)
        return True
    
    def enable_task(self, task_id: str) -> bool:
        """启用任务"""
        if task_id in self.tasks:
            self.tasks[task_id].enabled = True
            print(f"任务已启用: {task_id}")
            return True
        return False
    
    def disable_task(self, task_id: str) -> bool:
        """禁用任务"""
        if task_id in self.tasks:
            self.tasks[task_id].enabled = False
            print(f"任务已禁用: {task_id}")
            return True
        return False
    
    def delete_task(self, task_id: str) -> bool:
        """删除任务"""
        with self._lock:
            if task_id in self.tasks:
                del self.tasks[task_id]
                print(f"任务已删除: {task_id}")
                return True
        return False
    
    def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]:
        """获取任务状态"""
        task = self.tasks.get(task_id)
        if not task:
            return None
        
        return {
            "task_id": task.task_id,
            "name": task.name,
            "task_type": task.task_type.value,
            "automation_level": task.automation_level.value,
            "status": task.status.value,
            "enabled": task.enabled,
            "schedule_cron": task.schedule_cron,
            "last_run": task.last_run.isoformat() if task.last_run else None,
            "next_run": task.next_run.isoformat() if task.next_run else None,
            "run_count": task.run_count,
            "success_count": task.success_count,
            "failure_count": task.failure_count,
            "success_rate": task.success_count / task.run_count if task.run_count > 0 else 0,
            "average_duration": task.average_duration,
            "is_running": task_id in self.running_tasks
        }
    
    def list_tasks(self, task_type: TaskType = None, enabled_only: bool = False) -> List[Dict[str, Any]]:
        """列出任务"""
        tasks = []
        
        for task in self.tasks.values():
            if task_type and task.task_type != task_type:
                continue
            if enabled_only and not task.enabled:
                continue
            
            task_info = self.get_task_status(task.task_id)
            if task_info:
                tasks.append(task_info)
        
        return sorted(tasks, key=lambda x: x["name"])
    
    def get_automation_summary(self) -> Dict[str, Any]:
        """获取自动化摘要"""
        total_tasks = len(self.tasks)
        enabled_tasks = len([t for t in self.tasks.values() if t.enabled])
        running_tasks = len(self.running_tasks)
        
        # 按类型统计
        type_counts = {}
        for task_type in TaskType:
            type_counts[task_type.value] = len([
                t for t in self.tasks.values() if t.task_type == task_type
            ])
        
        # 成功率统计
        total_runs = sum(t.run_count for t in self.tasks.values())
        total_successes = sum(t.success_count for t in self.tasks.values())
        overall_success_rate = total_successes / total_runs if total_runs > 0 else 0
        
        return {
            "total_tasks": total_tasks,
            "enabled_tasks": enabled_tasks,
            "running_tasks": running_tasks,
            "scheduler_running": self._scheduler_running,
            "task_type_distribution": type_counts,
            "total_runs": total_runs,
            "overall_success_rate": overall_success_rate,
            "recent_history_count": len(self.task_history[-50:])
        }

# 自动化运维示例
print("\n=== HBase自动化运维示例 ===")

# 创建自动化管理器
automation_manager = HBaseAutomationManager(
    cluster_manager, backup_manager, monitor, troubleshooter
)

print("1. 创建自动化任务:")

# 每日备份任务
backup_task_id = automation_manager.create_task(
    name="每日全量备份",
    task_type=TaskType.BACKUP,
    description="每天凌晨2点执行全量备份",
    automation_level=AutomationLevel.FULLY_AUTOMATIC,
    schedule_cron="@daily",
    parameters={
        "tables": ["user_table", "order_table", "product_table"],
        "backup_type": "full",
        "destination": "/backup/daily"
    }
)

# 每小时监控任务
monitoring_task_id = automation_manager.create_task(
    name="系统健康监控",
    task_type=TaskType.MONITORING,
    description="每小时检查系统健康状态",
    automation_level=AutomationLevel.FULLY_AUTOMATIC,
    schedule_cron="@hourly"
)

# 维护任务
maintenance_task_id = automation_manager.create_task(
    name="周末维护清理",
    task_type=TaskType.MAINTENANCE,
    description="每周日执行系统维护",
    automation_level=AutomationLevel.SEMI_AUTOMATIC,
    schedule_cron="@weekly",
    parameters={
        "type": "cleanup",
        "retention_days": 30
    }
)

print("\n2. 启动任务调度器:")
automation_manager.start_scheduler()

print("\n3. 立即运行监控任务:")
automation_manager.run_task_now(monitoring_task_id)
time.sleep(2)

print("\n4. 任务列表:")
tasks = automation_manager.list_tasks()
for task in tasks:
    print(f"  {task['name']} ({task['task_type']})")
    print(f"    状态: {task['status']} | 启用: {task['enabled']} | 运行中: {task['is_running']}")
    print(f"    成功率: {task['success_rate']:.1%} ({task['success_count']}/{task['run_count']})")
    if task['next_run']:
        print(f"    下次运行: {task['next_run']}")
    print()

print("\n5. 自动化摘要:")
summary = automation_manager.get_automation_summary()
for key, value in summary.items():
    print(f"  {key}: {value}")

print("\n6. 停止调度器:")
automation_manager.stop_scheduler()

5.2 容量规划

class ResourceType(Enum):
    """资源类型枚举"""
    CPU = "cpu"
    MEMORY = "memory"
    DISK = "disk"
    NETWORK = "network"
    REGIONS = "regions"

class GrowthPattern(Enum):
    """增长模式枚举"""
    LINEAR = "linear"
    EXPONENTIAL = "exponential"
    SEASONAL = "seasonal"
    CUSTOM = "custom"

@dataclass
class CapacityMetric:
    """容量指标数据类"""
    resource_type: ResourceType
    current_usage: float
    current_capacity: float
    utilization_percent: float
    growth_rate_percent: float
    growth_pattern: GrowthPattern
    projected_usage_30d: float
    projected_usage_90d: float
    projected_usage_1y: float
    threshold_warning: float = 80.0
    threshold_critical: float = 90.0

@dataclass
class CapacityRecommendation:
    """容量建议数据类"""
    resource_type: ResourceType
    current_status: str  # "healthy", "warning", "critical"
    recommendation: str
    action_required: bool
    timeline_days: int
    estimated_cost: Optional[float] = None
    priority: str = "medium"  # "low", "medium", "high", "urgent"

class HBaseCapacityPlanner:
    """HBase容量规划器"""
    
    def __init__(self, cluster_manager: HBaseClusterManager, monitor: HBaseMonitor):
        self.cluster_manager = cluster_manager
        self.monitor = monitor
        self.historical_data: Dict[ResourceType, List[Dict[str, Any]]] = {
            resource_type: [] for resource_type in ResourceType
        }
        self.capacity_metrics: Dict[ResourceType, CapacityMetric] = {}
        self.recommendations: List[CapacityRecommendation] = []
    
    def collect_capacity_data(self):
        """收集容量数据"""
        print("收集容量数据...")
        
        # 收集CPU数据
        cpu_usage = sum(node.cpu_usage for node in self.cluster_manager.nodes.values() 
                       if node.status == NodeStatus.RUNNING)
        cpu_capacity = len([node for node in self.cluster_manager.nodes.values() 
                          if node.status == NodeStatus.RUNNING]) * 100
        
        self._record_metric(ResourceType.CPU, cpu_usage, cpu_capacity)
        
        # 收集内存数据
        memory_usage = sum(node.memory_usage for node in self.cluster_manager.nodes.values() 
                          if node.status == NodeStatus.RUNNING)
        memory_capacity = len([node for node in self.cluster_manager.nodes.values() 
                             if node.status == NodeStatus.RUNNING]) * 100
        
        self._record_metric(ResourceType.MEMORY, memory_usage, memory_capacity)
        
        # 收集磁盘数据
        disk_usage = sum(node.disk_usage for node in self.cluster_manager.nodes.values() 
                        if node.status == NodeStatus.RUNNING)
        disk_capacity = len([node for node in self.cluster_manager.nodes.values() 
                           if node.status == NodeStatus.RUNNING]) * 100
        
        self._record_metric(ResourceType.DISK, disk_usage, disk_capacity)
        
        # 收集Region数据
        region_count = random.randint(1000, 5000)  # 模拟Region数量
        max_regions = len([node for node in self.cluster_manager.nodes.values() 
                         if node.node_type == NodeType.REGION_SERVER and node.status == NodeStatus.RUNNING]) * 1000
        
        self._record_metric(ResourceType.REGIONS, region_count, max_regions)
        
        print("容量数据收集完成")
    
    def _record_metric(self, resource_type: ResourceType, usage: float, capacity: float):
        """记录指标"""
        timestamp = datetime.now()
        utilization = (usage / capacity * 100) if capacity > 0 else 0
        
        # 记录历史数据
        self.historical_data[resource_type].append({
            "timestamp": timestamp,
            "usage": usage,
            "capacity": capacity,
            "utilization": utilization
        })
        
        # 保留最近30天的数据
        cutoff_date = timestamp - timedelta(days=30)
        self.historical_data[resource_type] = [
            record for record in self.historical_data[resource_type]
            if record["timestamp"] > cutoff_date
        ]
    
    def analyze_capacity_trends(self) -> Dict[ResourceType, CapacityMetric]:
        """分析容量趋势"""
        print("分析容量趋势...")
        
        for resource_type in ResourceType:
            data = self.historical_data[resource_type]
            
            if len(data) < 2:
                continue
            
            # 计算增长率
            recent_data = data[-7:]  # 最近7天
            if len(recent_data) >= 2:
                start_usage = recent_data[0]["usage"]
                end_usage = recent_data[-1]["usage"]
                days_diff = (recent_data[-1]["timestamp"] - recent_data[0]["timestamp"]).days
                
                if days_diff > 0 and start_usage > 0:
                    daily_growth_rate = ((end_usage / start_usage) ** (1/days_diff) - 1) * 100
                else:
                    daily_growth_rate = 0
            else:
                daily_growth_rate = 0
            
            # 当前指标
            current_record = data[-1]
            current_usage = current_record["usage"]
            current_capacity = current_record["capacity"]
            utilization = current_record["utilization"]
            
            # 预测未来使用量
            projected_30d = self._project_usage(current_usage, daily_growth_rate, 30)
            projected_90d = self._project_usage(current_usage, daily_growth_rate, 90)
            projected_1y = self._project_usage(current_usage, daily_growth_rate, 365)
            
            # 确定增长模式
            growth_pattern = self._determine_growth_pattern(data)
            
            metric = CapacityMetric(
                resource_type=resource_type,
                current_usage=current_usage,
                current_capacity=current_capacity,
                utilization_percent=utilization,
                growth_rate_percent=daily_growth_rate,
                growth_pattern=growth_pattern,
                projected_usage_30d=projected_30d,
                projected_usage_90d=projected_90d,
                projected_usage_1y=projected_1y
            )
            
            self.capacity_metrics[resource_type] = metric
        
        print("容量趋势分析完成")
        return self.capacity_metrics
    
    def _project_usage(self, current_usage: float, daily_growth_rate: float, days: int) -> float:
        """预测使用量"""
        if daily_growth_rate == 0:
            return current_usage
        
        # 简单的复合增长模型
        return current_usage * ((1 + daily_growth_rate / 100) ** days)
    
    def _determine_growth_pattern(self, data: List[Dict[str, Any]]) -> GrowthPattern:
        """确定增长模式"""
        if len(data) < 7:
            return GrowthPattern.LINEAR
        
        # 简化的模式识别
        usage_values = [record["usage"] for record in data[-14:]]  # 最近14天
        
        # 计算变化率的方差
        if len(usage_values) >= 3:
            changes = [usage_values[i+1] - usage_values[i] for i in range(len(usage_values)-1)]
            avg_change = sum(changes) / len(changes)
            variance = sum((change - avg_change) ** 2 for change in changes) / len(changes)
            
            if variance > avg_change ** 2:  # 高方差表示季节性
                return GrowthPattern.SEASONAL
            elif avg_change > 0 and all(c >= 0 for c in changes[-3:]):  # 持续增长
                return GrowthPattern.EXPONENTIAL
        
        return GrowthPattern.LINEAR
    
    def generate_recommendations(self) -> List[CapacityRecommendation]:
        """生成容量建议"""
        print("生成容量建议...")
        recommendations = []
        
        for resource_type, metric in self.capacity_metrics.items():
            # 当前状态评估
            if metric.utilization_percent >= metric.threshold_critical:
                status = "critical"
                priority = "urgent"
            elif metric.utilization_percent >= metric.threshold_warning:
                status = "warning"
                priority = "high"
            else:
                status = "healthy"
                priority = "low"
            
            # 预测何时达到阈值
            days_to_warning = self._calculate_days_to_threshold(
                metric, metric.threshold_warning
            )
            days_to_critical = self._calculate_days_to_threshold(
                metric, metric.threshold_critical
            )
            
            # 生成建议
            recommendation_text = self._generate_recommendation_text(
                resource_type, metric, status, days_to_warning, days_to_critical
            )
            
            # 确定行动时间线
            if status == "critical":
                timeline = 7  # 立即行动
            elif status == "warning":
                timeline = 30  # 一个月内
            elif days_to_warning and days_to_warning < 60:
                timeline = days_to_warning - 14  # 提前两周
                priority = "medium"
            else:
                timeline = 90  # 季度规划
            
            recommendation = CapacityRecommendation(
                resource_type=resource_type,
                current_status=status,
                recommendation=recommendation_text,
                action_required=status in ["warning", "critical"] or (days_to_warning and days_to_warning < 60),
                timeline_days=max(timeline, 1),
                priority=priority
            )
            
            recommendations.append(recommendation)
        
        self.recommendations = recommendations
        print("容量建议生成完成")
        return recommendations
    
    def _calculate_days_to_threshold(self, metric: CapacityMetric, threshold: float) -> Optional[int]:
        """计算达到阈值的天数"""
        if metric.growth_rate_percent <= 0:
            return None
        
        current_usage_percent = metric.utilization_percent
        if current_usage_percent >= threshold:
            return 0
        
        # 简化计算:假设线性增长
        daily_growth = metric.growth_rate_percent
        days_needed = (threshold - current_usage_percent) / daily_growth
        
        return int(days_needed) if days_needed > 0 else None
    
    def _generate_recommendation_text(self, resource_type: ResourceType, 
                                    metric: CapacityMetric, status: str,
                                    days_to_warning: Optional[int], 
                                    days_to_critical: Optional[int]) -> str:
        """生成建议文本"""
        resource_name = {
            ResourceType.CPU: "CPU",
            ResourceType.MEMORY: "内存",
            ResourceType.DISK: "磁盘",
            ResourceType.NETWORK: "网络",
            ResourceType.REGIONS: "Region"
        }[resource_type]
        
        if status == "critical":
            return f"{resource_name}使用率已达到临界水平({metric.utilization_percent:.1f}%),需要立即扩容"
        elif status == "warning":
            return f"{resource_name}使用率较高({metric.utilization_percent:.1f}%),建议在30天内扩容"
        elif days_to_warning and days_to_warning < 60:
            return f"{resource_name}预计在{days_to_warning}天后达到警告阈值,建议提前规划扩容"
        else:
            return f"{resource_name}使用率正常({metric.utilization_percent:.1f}%),继续监控"
    
    def get_capacity_report(self) -> Dict[str, Any]:
        """获取容量报告"""
        report = {
            "report_time": datetime.now().isoformat(),
            "cluster_summary": {
                "total_nodes": len(self.cluster_manager.nodes),
                "running_nodes": len([n for n in self.cluster_manager.nodes.values() 
                                     if n.status == NodeStatus.RUNNING])
            },
            "capacity_metrics": {},
            "recommendations": [],
            "action_items": []
        }
        
        # 容量指标
        for resource_type, metric in self.capacity_metrics.items():
            report["capacity_metrics"][resource_type.value] = {
                "current_utilization": metric.utilization_percent,
                "growth_rate_daily": metric.growth_rate_percent,
                "growth_pattern": metric.growth_pattern.value,
                "projected_utilization_30d": (metric.projected_usage_30d / metric.current_capacity * 100) if metric.current_capacity > 0 else 0,
                "projected_utilization_90d": (metric.projected_usage_90d / metric.current_capacity * 100) if metric.current_capacity > 0 else 0,
                "projected_utilization_1y": (metric.projected_usage_1y / metric.current_capacity * 100) if metric.current_capacity > 0 else 0
            }
        
        # 建议
        for rec in self.recommendations:
            report["recommendations"].append({
                "resource_type": rec.resource_type.value,
                "status": rec.current_status,
                "recommendation": rec.recommendation,
                "priority": rec.priority,
                "timeline_days": rec.timeline_days
            })
            
            if rec.action_required:
                report["action_items"].append({
                    "resource_type": rec.resource_type.value,
                    "action": rec.recommendation,
                    "priority": rec.priority,
                    "deadline": (datetime.now() + timedelta(days=rec.timeline_days)).isoformat()
                })
        
        return report

# 容量规划示例
print("\n=== HBase容量规划示例 ===")

# 创建容量规划器
capacity_planner = HBaseCapacityPlanner(cluster_manager, monitor)

print("1. 收集容量数据:")
capacity_planner.collect_capacity_data()

# 模拟历史数据
for i in range(14):  # 14天历史数据
    for resource_type in ResourceType:
        base_usage = random.uniform(50, 80)
        growth = i * random.uniform(0.5, 2.0)  # 模拟增长
        usage = base_usage + growth
        capacity = 100 * len([n for n in cluster_manager.nodes.values() if n.status == NodeStatus.RUNNING])
        
        capacity_planner._record_metric(resource_type, usage, capacity)

print("\n2. 分析容量趋势:")
metrics = capacity_planner.analyze_capacity_trends()
for resource_type, metric in metrics.items():
    print(f"  {resource_type.value}:")
    print(f"    当前使用率: {metric.utilization_percent:.1f}%")
    print(f"    增长率: {metric.growth_rate_percent:.2f}%/天")
    print(f"    增长模式: {metric.growth_pattern.value}")
    print(f"    30天预测: {metric.projected_usage_30d / metric.current_capacity * 100:.1f}%")

print("\n3. 生成容量建议:")
recommendations = capacity_planner.generate_recommendations()
for rec in recommendations:
    priority_icon = {"low": "🟢", "medium": "🟡", "high": "🟠", "urgent": "🔴"}.get(rec.priority, "⚪")
    print(f"  {priority_icon} {rec.resource_type.value}: {rec.recommendation}")
    print(f"    状态: {rec.current_status} | 优先级: {rec.priority} | 时间线: {rec.timeline_days}天")

print("\n4. 容量报告:")
report = capacity_planner.get_capacity_report()
print(f"  集群节点: {report['cluster_summary']['running_nodes']}/{report['cluster_summary']['total_nodes']}")
print(f"  需要行动的项目: {len(report['action_items'])}")
print(f"  总建议数: {len(report['recommendations'])}")

if report['action_items']:
    print("\n  紧急行动项目:")
    for item in report['action_items']:
        print(f"    - {item['resource_type']}: {item['action']} (截止: {item['deadline'][:10]})")

6. 总结

6.1 关键要点

本章详细介绍了HBase集群管理与运维的各个方面:

  1. 集群部署与配置

    • 节点管理和集群状态监控
    • 配置管理和版本控制
    • 自动化部署流程
  2. 监控与告警

    • 多维度性能监控
    • 智能告警规则
    • 实时状态跟踪
  3. 备份与恢复

    • 多种备份策略
    • 快照管理
    • 自动化恢复流程
  4. 故障排除

    • 全面的诊断检查
    • 自动问题检测
    • 结构化问题管理
  5. 运维最佳实践

    • 自动化运维流程
    • 容量规划和预测
    • 性能优化建议

6.2 最佳实践

  1. 监控策略

    • 建立多层次监控体系
    • 设置合理的告警阈值
    • 定期审查监控指标
  2. 备份策略

    • 制定定期备份计划
    • 测试恢复流程
    • 保持多个备份副本
  3. 自动化运维

    • 逐步实现运维自动化
    • 建立标准化操作流程
    • 持续优化自动化脚本
  4. 容量规划

    • 定期进行容量评估
    • 提前规划扩容需求
    • 监控资源使用趋势
  5. 故障处理

    • 建立故障响应流程
    • 维护故障知识库
    • 定期进行故障演练

6.3 下一步学习

  1. 深入学习

    • HBase内核原理
    • 性能调优技巧
    • 大规模集群管理
  2. 工具集成

    • 与Hadoop生态系统集成
    • 监控工具选择和配置
    • 自动化工具开发
  3. 实践项目

    • 搭建测试集群
    • 模拟故障场景
    • 开发运维工具

通过本章的学习,您应该能够: - 有效管理HBase集群 - 建立完善的监控体系 - 实施可靠的备份策略 - 快速诊断和解决问题 - 实现运维自动化 - 进行合理的容量规划

这些技能将帮助您在生产环境中稳定、高效地运行HBase集群。