1. YARN概述

1.1 YARN简介

YARN(Yet Another Resource Negotiator)是Hadoop 2.0引入的资源管理系统,它将资源管理和作业调度功能从MapReduce中分离出来,形成了一个通用的资源管理平台。YARN的设计目标是支持多种计算框架,不仅仅是MapReduce,还包括Spark、Storm、Flink等。

1.2 YARN的设计目标

  1. 可扩展性:支持更大规模的集群
  2. 多租户:支持多个应用程序同时运行
  3. 兼容性:向后兼容MapReduce 1.x
  4. 资源利用率:提高集群资源利用率
  5. 可靠性:提供高可用性和容错能力

1.3 YARN与MapReduce 1.x的对比

class YARNComparison:
    """
    YARN与MapReduce 1.x对比分析
    """
    
    def __init__(self):
        self.mapreduce_v1 = {
            'architecture': 'Master-Slave',
            'components': ['JobTracker', 'TaskTracker'],
            'scalability': '最大4000节点',
            'resource_management': 'JobTracker统一管理',
            'fault_tolerance': 'JobTracker单点故障',
            'multi_tenancy': '不支持',
            'frameworks': '仅支持MapReduce'
        }
        
        self.yarn = {
            'architecture': '分层架构',
            'components': ['ResourceManager', 'NodeManager', 'ApplicationMaster'],
            'scalability': '支持10000+节点',
            'resource_management': '分布式资源管理',
            'fault_tolerance': 'ResourceManager HA',
            'multi_tenancy': '支持多租户',
            'frameworks': '支持多种计算框架'
        }
    
    def compare_architectures(self) -> dict:
        """
        对比架构差异
        
        Returns:
            dict: 架构对比结果
        """
        comparison = {
            'scalability': {
                'mapreduce_v1': self.mapreduce_v1['scalability'],
                'yarn': self.yarn['scalability'],
                'improvement': '可扩展性提升2.5倍以上'
            },
            'fault_tolerance': {
                'mapreduce_v1': self.mapreduce_v1['fault_tolerance'],
                'yarn': self.yarn['fault_tolerance'],
                'improvement': '消除单点故障,支持高可用'
            },
            'resource_utilization': {
                'mapreduce_v1': '静态资源分配,利用率低',
                'yarn': '动态资源分配,利用率高',
                'improvement': '资源利用率提升30-50%'
            },
            'framework_support': {
                'mapreduce_v1': self.mapreduce_v1['frameworks'],
                'yarn': self.yarn['frameworks'],
                'improvement': '支持多种计算框架,生态更丰富'
            }
        }
        
        return comparison
    
    def analyze_benefits(self) -> List[dict]:
        """
        分析YARN的优势
        
        Returns:
            List[dict]: 优势列表
        """
        benefits = [
            {
                'category': '可扩展性',
                'description': 'YARN采用分层架构,将资源管理和应用管理分离',
                'technical_details': [
                    'ResourceManager专注于资源管理',
                    'ApplicationMaster负责应用生命周期管理',
                    '减少了单个组件的负载'
                ],
                'impact': '支持更大规模的集群部署'
            },
            {
                'category': '资源利用率',
                'description': 'YARN支持动态资源分配和细粒度资源管理',
                'technical_details': [
                    '容器化资源分配',
                    '支持内存和CPU的精确控制',
                    '资源可以在不同应用间动态调整'
                ],
                'impact': '显著提高集群资源利用率'
            },
            {
                'category': '多框架支持',
                'description': 'YARN提供了通用的资源管理接口',
                'technical_details': [
                    '标准化的ApplicationMaster接口',
                    '容器抽象屏蔽底层资源细节',
                    '支持不同类型的计算模式'
                ],
                'impact': '一个集群可以运行多种计算框架'
            },
            {
                'category': '容错性',
                'description': 'YARN提供了更好的容错机制',
                'technical_details': [
                    'ResourceManager高可用',
                    'ApplicationMaster故障恢复',
                    'Container级别的故障隔离'
                ],
                'impact': '提高了系统的可靠性和稳定性'
            }
        ]
        
        return benefits
    
    def generate_migration_guide(self) -> dict:
        """
        生成从MapReduce 1.x到YARN的迁移指南
        
        Returns:
            dict: 迁移指南
        """
        migration_guide = {
            'preparation': {
                'title': '迁移准备',
                'steps': [
                    '评估现有MapReduce作业',
                    '分析资源使用情况',
                    '制定迁移计划',
                    '准备测试环境'
                ]
            },
            'configuration_changes': {
                'title': '配置变更',
                'changes': [
                    {
                        'component': 'JobTracker',
                        'old_config': 'mapred.job.tracker',
                        'new_config': 'yarn.resourcemanager.address',
                        'description': 'JobTracker地址改为ResourceManager地址'
                    },
                    {
                        'component': 'TaskTracker',
                        'old_config': 'mapred.tasktracker.map.tasks.maximum',
                        'new_config': 'yarn.nodemanager.resource.memory-mb',
                        'description': '任务槽位配置改为内存配置'
                    }
                ]
            },
            'code_changes': {
                'title': '代码修改',
                'requirements': [
                    '更新Hadoop客户端库版本',
                    '修改作业提交代码',
                    '更新配置文件路径',
                    '测试作业兼容性'
                ]
            },
            'testing': {
                'title': '测试验证',
                'test_cases': [
                    '功能测试:验证作业正确性',
                    '性能测试:对比执行性能',
                    '稳定性测试:长时间运行测试',
                    '容错测试:故障恢复测试'
                ]
            }
        }
        
        return migration_guide

# 使用示例
if __name__ == "__main__":
    comparison = YARNComparison()
    
    # 对比架构
    arch_comparison = comparison.compare_architectures()
    print("=== YARN vs MapReduce 1.x 架构对比 ===")
    for aspect, details in arch_comparison.items():
        print(f"{aspect}: {details['improvement']}")
    
    # 分析优势
    benefits = comparison.analyze_benefits()
    print("\n=== YARN主要优势 ===")
    for benefit in benefits:
        print(f"- {benefit['category']}: {benefit['impact']}")
    
    # 生成迁移指南
    migration = comparison.generate_migration_guide()
    print("\n=== 迁移指南概要 ===")
    for phase, details in migration.items():
        print(f"- {details['title']}: {len(details.get('steps', details.get('changes', details.get('requirements', details.get('test_cases', [])))))个要点")

2. YARN架构

2.1 YARN核心组件

YARN采用分层架构设计,主要包含以下核心组件:

  1. ResourceManager (RM):全局资源管理器
  2. NodeManager (NM):节点资源管理器
  3. ApplicationMaster (AM):应用程序管理器
  4. Container:资源容器

2.2 组件详细说明

import time
import threading
from typing import Dict, List, Optional
from enum import Enum
from dataclasses import dataclass

class ContainerState(Enum):
    """容器状态枚举"""
    NEW = "NEW"
    ALLOCATED = "ALLOCATED"
    ACQUIRED = "ACQUIRED"
    RUNNING = "RUNNING"
    COMPLETED = "COMPLETED"
    KILLED = "KILLED"

class ApplicationState(Enum):
    """应用状态枚举"""
    NEW = "NEW"
    SUBMITTED = "SUBMITTED"
    ACCEPTED = "ACCEPTED"
    RUNNING = "RUNNING"
    FINISHED = "FINISHED"
    FAILED = "FAILED"
    KILLED = "KILLED"

@dataclass
class Resource:
    """资源定义"""
    memory_mb: int
    vcores: int
    
    def __add__(self, other):
        return Resource(
            self.memory_mb + other.memory_mb,
            self.vcores + other.vcores
        )
    
    def __sub__(self, other):
        return Resource(
            max(0, self.memory_mb - other.memory_mb),
            max(0, self.vcores - other.vcores)
        )
    
    def can_satisfy(self, request: 'Resource') -> bool:
        """检查是否能满足资源请求"""
        return (self.memory_mb >= request.memory_mb and 
                self.vcores >= request.vcores)

@dataclass
class Container:
    """容器定义"""
    container_id: str
    node_id: str
    resource: Resource
    priority: int
    state: ContainerState = ContainerState.NEW
    start_time: Optional[float] = None
    finish_time: Optional[float] = None
    
    def start(self):
        """启动容器"""
        self.state = ContainerState.RUNNING
        self.start_time = time.time()
    
    def complete(self):
        """完成容器"""
        self.state = ContainerState.COMPLETED
        self.finish_time = time.time()

class NodeManager:
    """
    NodeManager模拟器
    负责单个节点的资源管理和容器生命周期管理
    """
    
    def __init__(self, node_id: str, total_resource: Resource):
        self.node_id = node_id
        self.total_resource = total_resource
        self.available_resource = Resource(total_resource.memory_mb, total_resource.vcores)
        self.containers: Dict[str, Container] = {}
        self.is_healthy = True
        self.last_heartbeat = time.time()
        self.resource_utilization_history = []
    
    def allocate_container(self, container: Container) -> bool:
        """
        分配容器
        
        Args:
            container: 要分配的容器
            
        Returns:
            bool: 是否分配成功
        """
        if not self.available_resource.can_satisfy(container.resource):
            return False
        
        # 分配资源
        self.available_resource = self.available_resource - container.resource
        container.state = ContainerState.ALLOCATED
        container.node_id = self.node_id
        self.containers[container.container_id] = container
        
        return True
    
    def start_container(self, container_id: str) -> bool:
        """
        启动容器
        
        Args:
            container_id: 容器ID
            
        Returns:
            bool: 是否启动成功
        """
        if container_id not in self.containers:
            return False
        
        container = self.containers[container_id]
        container.start()
        return True
    
    def stop_container(self, container_id: str) -> bool:
        """
        停止容器
        
        Args:
            container_id: 容器ID
            
        Returns:
            bool: 是否停止成功
        """
        if container_id not in self.containers:
            return False
        
        container = self.containers[container_id]
        container.complete()
        
        # 释放资源
        self.available_resource = self.available_resource + container.resource
        
        return True
    
    def heartbeat(self) -> dict:
        """
        向ResourceManager发送心跳
        
        Returns:
            dict: 心跳信息
        """
        self.last_heartbeat = time.time()
        
        # 计算资源利用率
        used_memory = self.total_resource.memory_mb - self.available_resource.memory_mb
        used_vcores = self.total_resource.vcores - self.available_resource.vcores
        
        utilization = {
            'memory_utilization': used_memory / self.total_resource.memory_mb,
            'cpu_utilization': used_vcores / self.total_resource.vcores
        }
        
        self.resource_utilization_history.append({
            'timestamp': time.time(),
            'utilization': utilization
        })
        
        # 保留最近100个记录
        if len(self.resource_utilization_history) > 100:
            self.resource_utilization_history.pop(0)
        
        return {
            'node_id': self.node_id,
            'is_healthy': self.is_healthy,
            'total_resource': self.total_resource,
            'available_resource': self.available_resource,
            'running_containers': len([c for c in self.containers.values() 
                                     if c.state == ContainerState.RUNNING]),
            'utilization': utilization
        }
    
    def get_container_status(self) -> List[dict]:
        """
        获取容器状态
        
        Returns:
            List[dict]: 容器状态列表
        """
        return [
            {
                'container_id': container.container_id,
                'state': container.state.value,
                'resource': container.resource,
                'start_time': container.start_time,
                'finish_time': container.finish_time
            }
            for container in self.containers.values()
        ]

class ApplicationMaster:
    """
    ApplicationMaster模拟器
    负责单个应用程序的生命周期管理
    """
    
    def __init__(self, app_id: str, app_type: str = "MapReduce"):
        self.app_id = app_id
        self.app_type = app_type
        self.state = ApplicationState.NEW
        self.resource_requests: List[dict] = []
        self.allocated_containers: Dict[str, Container] = {}
        self.completed_containers: List[Container] = []
        self.progress = 0.0
        self.start_time = None
        self.finish_time = None
    
    def submit_resource_request(self, resource: Resource, num_containers: int, 
                              priority: int = 1, locality: List[str] = None) -> str:
        """
        提交资源请求
        
        Args:
            resource: 请求的资源
            num_containers: 容器数量
            priority: 优先级
            locality: 本地性偏好
            
        Returns:
            str: 请求ID
        """
        request_id = f"req_{len(self.resource_requests)}"
        request = {
            'request_id': request_id,
            'resource': resource,
            'num_containers': num_containers,
            'priority': priority,
            'locality': locality or [],
            'timestamp': time.time()
        }
        
        self.resource_requests.append(request)
        return request_id
    
    def on_containers_allocated(self, containers: List[Container]):
        """
        处理容器分配事件
        
        Args:
            containers: 分配的容器列表
        """
        for container in containers:
            self.allocated_containers[container.container_id] = container
            # 启动容器中的任务
            self._start_task_in_container(container)
    
    def _start_task_in_container(self, container: Container):
        """
        在容器中启动任务
        
        Args:
            container: 容器
        """
        # 模拟任务启动
        container.start()
        
        # 模拟任务执行(在实际实现中,这里会启动具体的任务进程)
        def simulate_task():
            # 模拟任务执行时间
            execution_time = 10 + (container.priority * 5)  # 优先级越高执行时间越长
            time.sleep(execution_time)
            
            # 任务完成
            container.complete()
            self.completed_containers.append(container)
            del self.allocated_containers[container.container_id]
            
            # 更新进度
            self.progress = len(self.completed_containers) / (len(self.completed_containers) + len(self.allocated_containers))
        
        # 在新线程中执行任务
        task_thread = threading.Thread(target=simulate_task)
        task_thread.daemon = True
        task_thread.start()
    
    def on_containers_completed(self, containers: List[Container]):
        """
        处理容器完成事件
        
        Args:
            containers: 完成的容器列表
        """
        for container in containers:
            if container.container_id in self.allocated_containers:
                del self.allocated_containers[container.container_id]
            self.completed_containers.append(container)
    
    def get_progress(self) -> float:
        """
        获取应用程序进度
        
        Returns:
            float: 进度百分比
        """
        total_containers = len(self.completed_containers) + len(self.allocated_containers)
        if total_containers == 0:
            return 0.0
        return len(self.completed_containers) / total_containers
    
    def is_finished(self) -> bool:
        """
        检查应用程序是否完成
        
        Returns:
            bool: 是否完成
        """
        return (len(self.resource_requests) > 0 and 
                len(self.allocated_containers) == 0 and 
                self.get_progress() >= 1.0)

class ResourceManager:
    """
    ResourceManager模拟器
    负责全局资源管理和应用程序调度
    """
    
    def __init__(self):
        self.node_managers: Dict[str, NodeManager] = {}
        self.applications: Dict[str, ApplicationMaster] = {}
        self.pending_requests: List[dict] = []
        self.scheduler = None  # 调度器
        self.is_running = False
        self.total_resource = Resource(0, 0)
        self.available_resource = Resource(0, 0)
    
    def register_node_manager(self, node_manager: NodeManager):
        """
        注册NodeManager
        
        Args:
            node_manager: NodeManager实例
        """
        self.node_managers[node_manager.node_id] = node_manager
        self.total_resource = self.total_resource + node_manager.total_resource
        self.available_resource = self.available_resource + node_manager.available_resource
    
    def submit_application(self, application_master: ApplicationMaster) -> bool:
        """
        提交应用程序
        
        Args:
            application_master: ApplicationMaster实例
            
        Returns:
            bool: 是否提交成功
        """
        if application_master.app_id in self.applications:
            return False
        
        application_master.state = ApplicationState.SUBMITTED
        self.applications[application_master.app_id] = application_master
        
        # 启动应用程序
        application_master.state = ApplicationState.ACCEPTED
        application_master.start_time = time.time()
        
        return True
    
    def allocate_containers(self, app_id: str, resource_requests: List[dict]) -> List[Container]:
        """
        为应用程序分配容器
        
        Args:
            app_id: 应用程序ID
            resource_requests: 资源请求列表
            
        Returns:
            List[Container]: 分配的容器列表
        """
        allocated_containers = []
        
        for request in resource_requests:
            resource = request['resource']
            num_containers = request['num_containers']
            priority = request['priority']
            
            # 尝试分配容器
            for _ in range(num_containers):
                container = self._allocate_single_container(resource, priority)
                if container:
                    allocated_containers.append(container)
                else:
                    # 如果无法分配,将请求加入待处理队列
                    self.pending_requests.append({
                        'app_id': app_id,
                        'resource': resource,
                        'priority': priority,
                        'timestamp': time.time()
                    })
        
        return allocated_containers
    
    def _allocate_single_container(self, resource: Resource, priority: int) -> Optional[Container]:
        """
        分配单个容器
        
        Args:
            resource: 资源需求
            priority: 优先级
            
        Returns:
            Optional[Container]: 分配的容器,如果无法分配则返回None
        """
        # 寻找可用的NodeManager
        for node_id, nm in self.node_managers.items():
            if nm.available_resource.can_satisfy(resource):
                container_id = f"container_{int(time.time() * 1000)}_{node_id}"
                container = Container(
                    container_id=container_id,
                    node_id=node_id,
                    resource=resource,
                    priority=priority
                )
                
                if nm.allocate_container(container):
                    return container
        
        return None
    
    def process_heartbeats(self) -> dict:
        """
        处理所有NodeManager的心跳
        
        Returns:
            dict: 集群状态摘要
        """
        cluster_status = {
            'total_nodes': len(self.node_managers),
            'healthy_nodes': 0,
            'total_resource': self.total_resource,
            'available_resource': Resource(0, 0),
            'running_applications': len([app for app in self.applications.values() 
                                       if app.state == ApplicationState.RUNNING]),
            'node_utilization': []
        }
        
        for nm in self.node_managers.values():
            heartbeat = nm.heartbeat()
            
            if heartbeat['is_healthy']:
                cluster_status['healthy_nodes'] += 1
            
            cluster_status['available_resource'] = (
                cluster_status['available_resource'] + heartbeat['available_resource']
            )
            
            cluster_status['node_utilization'].append({
                'node_id': nm.node_id,
                'utilization': heartbeat['utilization']
            })
        
        return cluster_status
    
    def get_application_status(self, app_id: str) -> Optional[dict]:
        """
        获取应用程序状态
        
        Args:
            app_id: 应用程序ID
            
        Returns:
            Optional[dict]: 应用程序状态
        """
        if app_id not in self.applications:
            return None
        
        app = self.applications[app_id]
        return {
            'app_id': app.app_id,
            'app_type': app.app_type,
            'state': app.state.value,
            'progress': app.get_progress(),
            'allocated_containers': len(app.allocated_containers),
            'completed_containers': len(app.completed_containers),
            'start_time': app.start_time,
            'finish_time': app.finish_time
        }

# 使用示例
if __name__ == "__main__":
    # 创建ResourceManager
    rm = ResourceManager()
    
    # 创建NodeManager
    nm1 = NodeManager("node1", Resource(8192, 8))  # 8GB内存,8核CPU
    nm2 = NodeManager("node2", Resource(8192, 8))
    nm3 = NodeManager("node3", Resource(16384, 16))  # 16GB内存,16核CPU
    
    # 注册NodeManager
    rm.register_node_manager(nm1)
    rm.register_node_manager(nm2)
    rm.register_node_manager(nm3)
    
    # 创建应用程序
    app1 = ApplicationMaster("app_001", "MapReduce")
    app2 = ApplicationMaster("app_002", "Spark")
    
    # 提交应用程序
    rm.submit_application(app1)
    rm.submit_application(app2)
    
    # 应用程序请求资源
    app1.submit_resource_request(Resource(1024, 1), 4, priority=1)  # 请求4个容器
    app2.submit_resource_request(Resource(2048, 2), 2, priority=2)  # 请求2个容器
    
    # 分配容器
    containers1 = rm.allocate_containers("app_001", app1.resource_requests)
    containers2 = rm.allocate_containers("app_002", app2.resource_requests)
    
    # 通知应用程序容器已分配
    app1.on_containers_allocated(containers1)
    app2.on_containers_allocated(containers2)
    
    print("=== YARN集群状态 ===")
    cluster_status = rm.process_heartbeats()
    print(f"总节点数: {cluster_status['total_nodes']}")
    print(f"健康节点数: {cluster_status['healthy_nodes']}")
    print(f"运行中应用: {cluster_status['running_applications']}")
    
    print("\n=== 应用程序状态 ===")
    for app_id in ["app_001", "app_002"]:
        status = rm.get_application_status(app_id)
        if status:
            print(f"{app_id}: {status['state']}, 进度: {status['progress']:.1%}")

3. YARN调度器

3.1 调度器概述

YARN支持多种调度器,每种调度器都有不同的资源分配策略和适用场景:

  1. FIFO调度器:先进先出,简单但不支持多租户
  2. 容量调度器:支持多队列和资源保证
  3. 公平调度器:确保资源公平分配
  4. 优先级调度器:基于优先级进行调度

3.2 容量调度器实现

import heapq
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from enum import Enum
import time
import math

class QueueState(Enum):
    """队列状态枚举"""
    RUNNING = "RUNNING"
    STOPPED = "STOPPED"
    DRAINING = "DRAINING"

@dataclass
class QueueConfig:
    """队列配置"""
    name: str
    capacity: float  # 容量百分比 (0.0-1.0)
    maximum_capacity: float  # 最大容量百分比
    state: QueueState = QueueState.RUNNING
    user_limit_factor: float = 1.0  # 用户资源限制因子
    maximum_applications: int = 10000  # 最大应用数
    maximum_am_resource_percent: float = 0.1  # AM资源最大百分比
    node_locality_delay: int = 40  # 节点本地性延迟
    rack_locality_delay: int = 40  # 机架本地性延迟
    preemption_disabled: bool = False  # 是否禁用抢占
    parent: Optional['QueueConfig'] = None
    children: List['QueueConfig'] = field(default_factory=list)
    
    def add_child_queue(self, child_queue: 'QueueConfig'):
        """添加子队列"""
        child_queue.parent = self
        self.children.append(child_queue)
    
    def get_absolute_capacity(self) -> float:
        """获取绝对容量"""
        if self.parent is None:
            return self.capacity
        return self.parent.get_absolute_capacity() * self.capacity
    
    def get_absolute_max_capacity(self) -> float:
        """获取绝对最大容量"""
        if self.parent is None:
            return self.maximum_capacity
        return min(self.parent.get_absolute_max_capacity(), 
                  self.parent.get_absolute_capacity() * self.maximum_capacity)

@dataclass
class QueueMetrics:
    """队列指标"""
    allocated_resource: Resource = field(default_factory=lambda: Resource(0, 0))
    pending_resource: Resource = field(default_factory=lambda: Resource(0, 0))
    available_resource: Resource = field(default_factory=lambda: Resource(0, 0))
    num_active_applications: int = 0
    num_pending_applications: int = 0
    num_allocated_containers: int = 0
    num_pending_containers: int = 0
    utilization: float = 0.0
    
    def update_utilization(self, total_resource: Resource):
        """更新利用率"""
        if total_resource.memory_mb > 0:
            self.utilization = self.allocated_resource.memory_mb / total_resource.memory_mb

class CapacitySchedulerQueue:
    """
    容量调度器队列实现
    """
    
    def __init__(self, config: QueueConfig, total_resource: Resource):
        self.config = config
        self.total_resource = total_resource
        self.metrics = QueueMetrics()
        self.applications: Dict[str, ApplicationMaster] = {}
        self.pending_applications: List[ApplicationMaster] = []
        self.user_resources: Dict[str, Resource] = {}  # 用户资源使用情况
        
        # 计算队列资源
        self.guaranteed_resource = Resource(
            int(total_resource.memory_mb * config.get_absolute_capacity()),
            int(total_resource.vcores * config.get_absolute_capacity())
        )
        
        self.max_resource = Resource(
            int(total_resource.memory_mb * config.get_absolute_max_capacity()),
            int(total_resource.vcores * config.get_absolute_max_capacity())
        )
    
    def submit_application(self, application: ApplicationMaster, user: str) -> bool:
        """
        提交应用程序到队列
        
        Args:
            application: 应用程序
            user: 用户名
            
        Returns:
            bool: 是否提交成功
        """
        # 检查队列状态
        if self.config.state != QueueState.RUNNING:
            return False
        
        # 检查应用数量限制
        if len(self.applications) >= self.config.maximum_applications:
            return False
        
        # 检查AM资源限制
        am_resource_limit = Resource(
            int(self.total_resource.memory_mb * self.config.maximum_am_resource_percent),
            int(self.total_resource.vcores * self.config.maximum_am_resource_percent)
        )
        
        current_am_resource = Resource(0, 0)
        for app in self.applications.values():
            if len(app.allocated_containers) > 0:
                # 假设第一个容器是AM容器
                am_container = list(app.allocated_containers.values())[0]
                current_am_resource = current_am_resource + am_container.resource
        
        # 估算新AM资源需求(通常是1GB内存,1核CPU)
        estimated_am_resource = Resource(1024, 1)
        if not (current_am_resource + estimated_am_resource).can_satisfy(am_resource_limit):
            return False
        
        # 添加到队列
        application.queue_name = self.config.name
        application.user = user
        self.applications[application.app_id] = application
        self.metrics.num_active_applications += 1
        
        return True
    
    def allocate_resources(self, available_resource: Resource) -> List[Tuple[str, Container]]:
        """
        为队列中的应用程序分配资源
        
        Args:
            available_resource: 可用资源
            
        Returns:
            List[Tuple[str, Container]]: 分配结果列表 (app_id, container)
        """
        allocations = []
        
        # 计算队列可用资源
        queue_available = self._calculate_queue_available_resource(available_resource)
        
        if queue_available.memory_mb <= 0 or queue_available.vcores <= 0:
            return allocations
        
        # 按优先级和提交时间排序应用程序
        sorted_apps = sorted(
            self.applications.values(),
            key=lambda app: (-app.resource_requests[0]['priority'] if app.resource_requests else 0, 
                           app.start_time or 0)
        )
        
        remaining_resource = queue_available
        
        for app in sorted_apps:
            if not app.resource_requests:
                continue
            
            # 检查用户资源限制
            user_limit = self._calculate_user_resource_limit(app.user)
            user_used = self.user_resources.get(app.user, Resource(0, 0))
            user_available = user_limit - user_used
            
            if user_available.memory_mb <= 0 or user_available.vcores <= 0:
                continue
            
            # 为应用程序分配资源
            for request in app.resource_requests:
                resource_needed = request['resource']
                num_containers = request['num_containers']
                
                # 计算实际可分配的容器数量
                max_by_resource = min(
                    remaining_resource.memory_mb // resource_needed.memory_mb,
                    remaining_resource.vcores // resource_needed.vcores
                )
                
                max_by_user = min(
                    user_available.memory_mb // resource_needed.memory_mb,
                    user_available.vcores // resource_needed.vcores
                )
                
                allocatable = min(num_containers, max_by_resource, max_by_user)
                
                # 分配容器
                for i in range(allocatable):
                    container_id = f"container_{int(time.time() * 1000)}_{i}"
                    container = Container(
                        container_id=container_id,
                        node_id="",  # 将由NodeManager分配
                        resource=resource_needed,
                        priority=request['priority']
                    )
                    
                    allocations.append((app.app_id, container))
                    
                    # 更新资源使用情况
                    remaining_resource = remaining_resource - resource_needed
                    user_available = user_available - resource_needed
                    
                    if app.user not in self.user_resources:
                        self.user_resources[app.user] = Resource(0, 0)
                    self.user_resources[app.user] = self.user_resources[app.user] + resource_needed
                    
                    # 更新请求
                    request['num_containers'] -= 1
                    
                    if remaining_resource.memory_mb <= 0 or remaining_resource.vcores <= 0:
                        break
                
                if remaining_resource.memory_mb <= 0 or remaining_resource.vcores <= 0:
                    break
            
            # 清理已满足的请求
            app.resource_requests = [req for req in app.resource_requests if req['num_containers'] > 0]
        
        return allocations
    
    def _calculate_queue_available_resource(self, cluster_available: Resource) -> Resource:
        """
        计算队列可用资源
        
        Args:
            cluster_available: 集群可用资源
            
        Returns:
            Resource: 队列可用资源
        """
        # 计算队列当前使用的资源
        current_used = self.metrics.allocated_resource
        
        # 计算队列保证资源的剩余部分
        guaranteed_available = self.guaranteed_resource - current_used
        
        # 如果队列使用超过保证资源,检查是否可以使用额外资源
        if guaranteed_available.memory_mb <= 0 or guaranteed_available.vcores <= 0:
            # 检查是否可以使用最大容量
            max_available = self.max_resource - current_used
            
            # 只能使用集群剩余资源的一部分
            extra_available = Resource(
                min(max_available.memory_mb, cluster_available.memory_mb),
                min(max_available.vcores, cluster_available.vcores)
            )
            
            return extra_available
        else:
            # 可以使用保证资源加上部分集群剩余资源
            total_available = Resource(
                min(guaranteed_available.memory_mb + cluster_available.memory_mb, 
                    self.max_resource.memory_mb - current_used.memory_mb),
                min(guaranteed_available.vcores + cluster_available.vcores,
                    self.max_resource.vcores - current_used.vcores)
            )
            
            return total_available
    
    def _calculate_user_resource_limit(self, user: str) -> Resource:
        """
        计算用户资源限制
        
        Args:
            user: 用户名
            
        Returns:
            Resource: 用户资源限制
        """
        # 用户限制 = 队列容量 * 用户限制因子 / 活跃用户数
        active_users = len(set(app.user for app in self.applications.values()))
        if active_users == 0:
            active_users = 1
        
        user_share = self.config.user_limit_factor / active_users
        
        return Resource(
            int(self.guaranteed_resource.memory_mb * user_share),
            int(self.guaranteed_resource.vcores * user_share)
        )
    
    def release_resources(self, app_id: str, containers: List[Container]):
        """
        释放资源
        
        Args:
            app_id: 应用程序ID
            containers: 要释放的容器列表
        """
        if app_id not in self.applications:
            return
        
        app = self.applications[app_id]
        
        for container in containers:
            # 更新队列资源使用情况
            self.metrics.allocated_resource = self.metrics.allocated_resource - container.resource
            self.metrics.num_allocated_containers -= 1
            
            # 更新用户资源使用情况
            if app.user in self.user_resources:
                self.user_resources[app.user] = self.user_resources[app.user] - container.resource
    
    def update_metrics(self):
        """
        更新队列指标
        """
        # 重新计算分配的资源
        total_allocated = Resource(0, 0)
        total_containers = 0
        
        for app in self.applications.values():
            for container in app.allocated_containers.values():
                total_allocated = total_allocated + container.resource
                total_containers += 1
        
        self.metrics.allocated_resource = total_allocated
        self.metrics.num_allocated_containers = total_containers
        self.metrics.num_active_applications = len(self.applications)
        
        # 计算待处理资源
        total_pending = Resource(0, 0)
        pending_containers = 0
        
        for app in self.applications.values():
            for request in app.resource_requests:
                resource_needed = request['resource']
                num_containers = request['num_containers']
                total_pending = total_pending + Resource(
                    resource_needed.memory_mb * num_containers,
                    resource_needed.vcores * num_containers
                )
                pending_containers += num_containers
        
        self.metrics.pending_resource = total_pending
        self.metrics.num_pending_containers = pending_containers
        
        # 更新利用率
        self.metrics.update_utilization(self.guaranteed_resource)

class CapacityScheduler:
    """
    容量调度器实现
    """
    
    def __init__(self, total_resource: Resource):
        self.total_resource = total_resource
        self.queues: Dict[str, CapacitySchedulerQueue] = {}
        self.queue_hierarchy: Dict[str, List[str]] = {}  # 队列层次结构
        self.preemption_enabled = True
        self.preemption_monitor_interval = 3000  # 抢占监控间隔(毫秒)
        self.last_preemption_check = time.time()
    
    def add_queue(self, queue_config: QueueConfig):
        """
        添加队列
        
        Args:
            queue_config: 队列配置
        """
        queue = CapacitySchedulerQueue(queue_config, self.total_resource)
        self.queues[queue_config.name] = queue
        
        # 更新层次结构
        if queue_config.parent:
            parent_name = queue_config.parent.name
            if parent_name not in self.queue_hierarchy:
                self.queue_hierarchy[parent_name] = []
            self.queue_hierarchy[parent_name].append(queue_config.name)
    
    def submit_application(self, application: ApplicationMaster, queue_name: str, user: str) -> bool:
        """
        提交应用程序
        
        Args:
            application: 应用程序
            queue_name: 队列名称
            user: 用户名
            
        Returns:
            bool: 是否提交成功
        """
        if queue_name not in self.queues:
            return False
        
        return self.queues[queue_name].submit_application(application, user)
    
    def allocate_resources(self, available_resource: Resource) -> Dict[str, List[Tuple[str, Container]]]:
        """
        分配资源给所有队列
        
        Args:
            available_resource: 可用资源
            
        Returns:
            Dict[str, List[Tuple[str, Container]]]: 每个队列的分配结果
        """
        allocations = {}
        remaining_resource = available_resource
        
        # 按队列容量排序,优先分配给容量大的队列
        sorted_queues = sorted(
            self.queues.items(),
            key=lambda x: x[1].config.get_absolute_capacity(),
            reverse=True
        )
        
        for queue_name, queue in sorted_queues:
            if remaining_resource.memory_mb <= 0 or remaining_resource.vcores <= 0:
                break
            
            queue_allocations = queue.allocate_resources(remaining_resource)
            
            if queue_allocations:
                allocations[queue_name] = queue_allocations
                
                # 更新剩余资源
                for _, container in queue_allocations:
                    remaining_resource = remaining_resource - container.resource
        
        return allocations
    
    def check_preemption(self) -> List[Tuple[str, str, Container]]:
        """
        检查是否需要抢占资源
        
        Returns:
            List[Tuple[str, str, Container]]: 抢占列表 (from_queue, to_queue, container)
        """
        if not self.preemption_enabled:
            return []
        
        current_time = time.time()
        if (current_time - self.last_preemption_check) * 1000 < self.preemption_monitor_interval:
            return []
        
        self.last_preemption_check = current_time
        preemptions = []
        
        # 找出资源不足的队列
        under_served_queues = []
        over_served_queues = []
        
        for queue_name, queue in self.queues.items():
            utilization = queue.metrics.utilization
            guaranteed_utilization = queue.config.get_absolute_capacity()
            
            if utilization < guaranteed_utilization * 0.8:  # 使用率低于保证容量的80%
                under_served_queues.append((queue_name, guaranteed_utilization - utilization))
            elif utilization > guaranteed_utilization * 1.2:  # 使用率超过保证容量的120%
                over_served_queues.append((queue_name, utilization - guaranteed_utilization))
        
        # 从过度使用的队列抢占资源给不足的队列
        under_served_queues.sort(key=lambda x: x[1], reverse=True)  # 按需求排序
        over_served_queues.sort(key=lambda x: x[1], reverse=True)   # 按超额排序
        
        for under_queue_name, under_amount in under_served_queues:
            for over_queue_name, over_amount in over_served_queues:
                if under_amount <= 0:
                    break
                
                over_queue = self.queues[over_queue_name]
                
                # 找到可以抢占的容器(优先级低的)
                preemptable_containers = []
                for app in over_queue.applications.values():
                    for container in app.allocated_containers.values():
                        if container.priority <= 1:  # 低优先级容器
                            preemptable_containers.append((app.app_id, container))
                
                # 按优先级排序,优先抢占低优先级容器
                preemptable_containers.sort(key=lambda x: x[1].priority)
                
                for app_id, container in preemptable_containers:
                    if under_amount <= 0:
                        break
                    
                    preemptions.append((over_queue_name, under_queue_name, container))
                    under_amount -= container.resource.memory_mb / self.total_resource.memory_mb
                    over_amount -= container.resource.memory_mb / self.total_resource.memory_mb
        
        return preemptions
    
    def get_queue_info(self, queue_name: str) -> Optional[dict]:
        """
        获取队列信息
        
        Args:
            queue_name: 队列名称
            
        Returns:
            Optional[dict]: 队列信息
        """
        if queue_name not in self.queues:
            return None
        
        queue = self.queues[queue_name]
        
        return {
            'name': queue.config.name,
            'state': queue.config.state.value,
            'capacity': queue.config.capacity,
            'maximum_capacity': queue.config.maximum_capacity,
            'absolute_capacity': queue.config.get_absolute_capacity(),
            'absolute_max_capacity': queue.config.get_absolute_max_capacity(),
            'guaranteed_resource': queue.guaranteed_resource,
            'max_resource': queue.max_resource,
            'allocated_resource': queue.metrics.allocated_resource,
            'available_resource': queue.metrics.available_resource,
            'pending_resource': queue.metrics.pending_resource,
            'utilization': queue.metrics.utilization,
            'num_active_applications': queue.metrics.num_active_applications,
            'num_pending_applications': queue.metrics.num_pending_applications,
            'num_allocated_containers': queue.metrics.num_allocated_containers,
            'num_pending_containers': queue.metrics.num_pending_containers,
            'user_resources': queue.user_resources
        }
    
    def get_cluster_info(self) -> dict:
        """
        获取集群信息
        
        Returns:
            dict: 集群信息
        """
        total_allocated = Resource(0, 0)
        total_pending = Resource(0, 0)
        total_applications = 0
        total_containers = 0
        
        queue_infos = {}
        
        for queue_name, queue in self.queues.items():
            queue.update_metrics()
            queue_info = self.get_queue_info(queue_name)
            queue_infos[queue_name] = queue_info
            
            total_allocated = total_allocated + queue.metrics.allocated_resource
            total_pending = total_pending + queue.metrics.pending_resource
            total_applications += queue.metrics.num_active_applications
            total_containers += queue.metrics.num_allocated_containers
        
        cluster_utilization = 0.0
        if self.total_resource.memory_mb > 0:
            cluster_utilization = total_allocated.memory_mb / self.total_resource.memory_mb
        
        return {
            'total_resource': self.total_resource,
            'allocated_resource': total_allocated,
            'available_resource': self.total_resource - total_allocated,
            'pending_resource': total_pending,
            'utilization': cluster_utilization,
            'num_applications': total_applications,
            'num_containers': total_containers,
            'queues': queue_infos
        }

# 使用示例
if __name__ == "__main__":
    # 创建容量调度器
    total_resource = Resource(32768, 32)  # 32GB内存,32核CPU
    scheduler = CapacityScheduler(total_resource)
    
    # 创建队列配置
    root_queue = QueueConfig("root", 1.0, 1.0)
    
    # 创建子队列
    default_queue = QueueConfig("default", 0.4, 0.6)  # 40%容量,最大60%
    production_queue = QueueConfig("production", 0.4, 0.8)  # 40%容量,最大80%
    development_queue = QueueConfig("development", 0.2, 0.4)  # 20%容量,最大40%
    
    root_queue.add_child_queue(default_queue)
    root_queue.add_child_queue(production_queue)
    root_queue.add_child_queue(development_queue)
    
    # 添加队列到调度器
    scheduler.add_queue(default_queue)
    scheduler.add_queue(production_queue)
    scheduler.add_queue(development_queue)
    
    # 创建应用程序
    app1 = ApplicationMaster("app_001", "MapReduce")
    app2 = ApplicationMaster("app_002", "Spark")
    app3 = ApplicationMaster("app_003", "Flink")
    
    # 提交应用程序到不同队列
    scheduler.submit_application(app1, "production", "user1")
    scheduler.submit_application(app2, "default", "user2")
    scheduler.submit_application(app3, "development", "user3")
    
    # 应用程序请求资源
    app1.submit_resource_request(Resource(2048, 2), 4, priority=3)  # 高优先级
    app2.submit_resource_request(Resource(1024, 1), 6, priority=2)  # 中优先级
    app3.submit_resource_request(Resource(512, 1), 8, priority=1)   # 低优先级
    
    # 模拟资源分配
    available_resource = Resource(16384, 16)  # 50%资源可用
    allocations = scheduler.allocate_resources(available_resource)
    
    print("=== 容量调度器分配结果 ===")
    for queue_name, queue_allocations in allocations.items():
        print(f"队列 {queue_name}: 分配了 {len(queue_allocations)} 个容器")
        for app_id, container in queue_allocations:
            print(f"  应用 {app_id}: 容器 {container.container_id}, 资源 {container.resource.memory_mb}MB/{container.resource.vcores}核")
    
    # 获取集群信息
    cluster_info = scheduler.get_cluster_info()
    print("\n=== 集群状态 ===")
    print(f"总资源: {cluster_info['total_resource'].memory_mb}MB/{cluster_info['total_resource'].vcores}核")
    print(f"已分配: {cluster_info['allocated_resource'].memory_mb}MB/{cluster_info['allocated_resource'].vcores}核")
    print(f"利用率: {cluster_info['utilization']:.1%}")
    print(f"应用数: {cluster_info['num_applications']}")
    
    print("\n=== 队列状态 ===")
    for queue_name, queue_info in cluster_info['queues'].items():
        print(f"队列 {queue_name}:")
        print(f"  容量: {queue_info['capacity']:.1%} (最大: {queue_info['maximum_capacity']:.1%})")
        print(f"  利用率: {queue_info['utilization']:.1%}")
        print(f"  应用数: {queue_info['num_active_applications']}")
        print(f"  容器数: {queue_info['num_allocated_containers']}")

3.3 公平调度器实现

import math
from typing import Dict, List, Set
from dataclasses import dataclass
import time

@dataclass
class FairShareConfig:
    """公平份额配置"""
    name: str
    weight: float = 1.0  # 权重
    min_resources: Resource = None  # 最小资源保证
    max_resources: Resource = None  # 最大资源限制
    max_running_apps: int = None  # 最大运行应用数
    scheduling_policy: str = "fair"  # 调度策略: fair, fifo, drf
    preemption_timeout: int = 15000  # 抢占超时时间(毫秒)
    
    def __post_init__(self):
        if self.min_resources is None:
            self.min_resources = Resource(0, 0)
        if self.max_resources is None:
            self.max_resources = Resource(float('inf'), float('inf'))
        if self.max_running_apps is None:
            self.max_running_apps = float('inf')

class FairSchedulerQueue:
    """
    公平调度器队列实现
    """
    
    def __init__(self, config: FairShareConfig, total_resource: Resource):
        self.config = config
        self.total_resource = total_resource
        self.applications: Dict[str, ApplicationMaster] = {}
        self.allocated_resource = Resource(0, 0)
        self.demand = Resource(0, 0)  # 资源需求
        self.fair_share = Resource(0, 0)  # 公平份额
        self.steady_fair_share = Resource(0, 0)  # 稳定公平份额
        self.last_preemption_check = time.time()
    
    def submit_application(self, application: ApplicationMaster, user: str) -> bool:
        """
        提交应用程序到队列
        
        Args:
            application: 应用程序
            user: 用户名
            
        Returns:
            bool: 是否提交成功
        """
        if len(self.applications) >= self.config.max_running_apps:
            return False
        
        application.queue_name = self.config.name
        application.user = user
        self.applications[application.app_id] = application
        
        return True
    
    def calculate_demand(self):
        """
        计算队列资源需求
        """
        total_demand = Resource(0, 0)
        
        for app in self.applications.values():
            for request in app.resource_requests:
                resource_needed = request['resource']
                num_containers = request['num_containers']
                total_demand = total_demand + Resource(
                    resource_needed.memory_mb * num_containers,
                    resource_needed.vcores * num_containers
                )
        
        self.demand = total_demand
    
    def update_fair_share(self, total_fair_share: Resource, total_weight: float):
        """
        更新公平份额
        
        Args:
            total_fair_share: 总公平份额
            total_weight: 总权重
        """
        if total_weight > 0:
            weight_ratio = self.config.weight / total_weight
            self.fair_share = Resource(
                int(total_fair_share.memory_mb * weight_ratio),
                int(total_fair_share.vcores * weight_ratio)
            )
        else:
            self.fair_share = Resource(0, 0)
        
        # 确保不超过最大资源限制
        self.fair_share = Resource(
            min(self.fair_share.memory_mb, self.config.max_resources.memory_mb),
            min(self.fair_share.vcores, self.config.max_resources.vcores)
        )
        
        # 确保满足最小资源保证
        self.fair_share = Resource(
            max(self.fair_share.memory_mb, self.config.min_resources.memory_mb),
            max(self.fair_share.vcores, self.config.min_resources.vcores)
        )
    
    def calculate_usage_ratio(self) -> float:
        """
        计算使用率比例(相对于公平份额)
        
        Returns:
            float: 使用率比例
        """
        if self.fair_share.memory_mb == 0:
            return 0.0
        
        return self.allocated_resource.memory_mb / self.fair_share.memory_mb
    
    def is_starved(self) -> bool:
        """
        检查队列是否资源不足
        
        Returns:
            bool: 是否资源不足
        """
        # 如果分配的资源少于公平份额的一半,且有待处理的请求,则认为资源不足
        return (self.allocated_resource.memory_mb < self.fair_share.memory_mb * 0.5 and
                self.demand.memory_mb > self.allocated_resource.memory_mb)
    
    def needs_preemption(self) -> bool:
        """
        检查是否需要抢占资源
        
        Returns:
            bool: 是否需要抢占
        """
        current_time = time.time()
        time_since_last_check = (current_time - self.last_preemption_check) * 1000
        
        return (self.is_starved() and 
                time_since_last_check > self.config.preemption_timeout)

class FairScheduler:
    """
    公平调度器实现
    """
    
    def __init__(self, total_resource: Resource):
        self.total_resource = total_resource
        self.queues: Dict[str, FairSchedulerQueue] = {}
        self.preemption_enabled = True
        self.continuous_scheduling_enabled = True
        self.locality_threshold_node = 0.5  # 节点本地性阈值
        self.locality_threshold_rack = 0.6  # 机架本地性阈值
    
    def add_queue(self, config: FairShareConfig):
        """
        添加队列
        
        Args:
            config: 公平份额配置
        """
        queue = FairSchedulerQueue(config, self.total_resource)
        self.queues[config.name] = queue
    
    def submit_application(self, application: ApplicationMaster, queue_name: str, user: str) -> bool:
        """
        提交应用程序
        
        Args:
            application: 应用程序
            queue_name: 队列名称
            user: 用户名
            
        Returns:
            bool: 是否提交成功
        """
        if queue_name not in self.queues:
            return False
        
        return self.queues[queue_name].submit_application(application, user)
    
    def update_fair_shares(self):
        """
        更新所有队列的公平份额
        """
        # 计算总权重
        total_weight = sum(queue.config.weight for queue in self.queues.values())
        
        # 计算每个队列的需求
        for queue in self.queues.values():
            queue.calculate_demand()
        
        # 分配公平份额
        available_resource = self.total_resource
        
        # 首先满足最小资源保证
        for queue in self.queues.values():
            min_allocation = queue.config.min_resources
            queue.fair_share = Resource(
                min(min_allocation.memory_mb, available_resource.memory_mb),
                min(min_allocation.vcores, available_resource.vcores)
            )
            available_resource = available_resource - queue.fair_share
        
        # 按权重分配剩余资源
        if total_weight > 0:
            for queue in self.queues.values():
                additional_share = Resource(
                    int(available_resource.memory_mb * queue.config.weight / total_weight),
                    int(available_resource.vcores * queue.config.weight / total_weight)
                )
                
                # 确保不超过最大资源限制
                max_additional = Resource(
                    queue.config.max_resources.memory_mb - queue.fair_share.memory_mb,
                    queue.config.max_resources.vcores - queue.fair_share.vcores
                )
                
                additional_share = Resource(
                    min(additional_share.memory_mb, max_additional.memory_mb),
                    min(additional_share.vcores, max_additional.vcores)
                )
                
                queue.fair_share = queue.fair_share + additional_share
    
    def allocate_resources(self, available_resource: Resource) -> Dict[str, List[Tuple[str, Container]]]:
        """
        分配资源给所有队列
        
        Args:
            available_resource: 可用资源
            
        Returns:
            Dict[str, List[Tuple[str, Container]]]: 每个队列的分配结果
        """
        # 更新公平份额
        self.update_fair_shares()
        
        allocations = {}
        remaining_resource = available_resource
        
        # 按使用率排序队列,优先分配给使用率低的队列
        sorted_queues = sorted(
            self.queues.items(),
            key=lambda x: x[1].calculate_usage_ratio()
        )
        
        for queue_name, queue in sorted_queues:
            if remaining_resource.memory_mb <= 0 or remaining_resource.vcores <= 0:
                break
            
            # 计算队列可以获得的资源
            queue_available = self._calculate_queue_allocation(queue, remaining_resource)
            
            if queue_available.memory_mb <= 0 or queue_available.vcores <= 0:
                continue
            
            # 为队列中的应用程序分配资源
            queue_allocations = self._allocate_to_applications(queue, queue_available)
            
            if queue_allocations:
                allocations[queue_name] = queue_allocations
                
                # 更新剩余资源和队列分配资源
                for _, container in queue_allocations:
                    remaining_resource = remaining_resource - container.resource
                    queue.allocated_resource = queue.allocated_resource + container.resource
        
        return allocations
    
    def _calculate_queue_allocation(self, queue: FairSchedulerQueue, available_resource: Resource) -> Resource:
        """
        计算队列可以获得的资源分配
        
        Args:
            queue: 队列
            available_resource: 可用资源
            
        Returns:
            Resource: 队列可获得的资源
        """
        # 计算队列还需要多少资源才能达到公平份额
        fair_share_deficit = Resource(
            max(0, queue.fair_share.memory_mb - queue.allocated_resource.memory_mb),
            max(0, queue.fair_share.vcores - queue.allocated_resource.vcores)
        )
        
        # 队列最多可以获得的资源是公平份额缺口和可用资源的最小值
        allocation = Resource(
            min(fair_share_deficit.memory_mb, available_resource.memory_mb),
            min(fair_share_deficit.vcores, available_resource.vcores)
        )
        
        # 如果队列已经达到或超过公平份额,但集群还有剩余资源,可以分配一部分
        if allocation.memory_mb == 0 and available_resource.memory_mb > 0:
            # 分配少量额外资源,但不超过队列需求
            extra_allocation = Resource(
                min(queue.demand.memory_mb - queue.allocated_resource.memory_mb,
                    available_resource.memory_mb // len(self.queues)),
                min(queue.demand.vcores - queue.allocated_resource.vcores,
                    available_resource.vcores // len(self.queues))
            )
            
            allocation = Resource(
                max(0, extra_allocation.memory_mb),
                max(0, extra_allocation.vcores)
            )
        
        return allocation
    
    def _allocate_to_applications(self, queue: FairSchedulerQueue, available_resource: Resource) -> List[Tuple[str, Container]]:
        """
        为队列中的应用程序分配资源
        
        Args:
            queue: 队列
            available_resource: 可用资源
            
        Returns:
            List[Tuple[str, Container]]: 分配结果列表
        """
        allocations = []
        remaining_resource = available_resource
        
        if queue.config.scheduling_policy == "fair":
            # 公平调度:按应用程序需求平均分配
            apps_with_demand = [app for app in queue.applications.values() if app.resource_requests]
            
            if not apps_with_demand:
                return allocations
            
            # 计算每个应用程序的公平份额
            app_fair_share = Resource(
                remaining_resource.memory_mb // len(apps_with_demand),
                remaining_resource.vcores // len(apps_with_demand)
            )
            
            for app in apps_with_demand:
                app_allocation = self._allocate_to_single_app(app, app_fair_share)
                allocations.extend(app_allocation)
                
                for _, container in app_allocation:
                    remaining_resource = remaining_resource - container.resource
                    if remaining_resource.memory_mb <= 0 or remaining_resource.vcores <= 0:
                        break
        
        elif queue.config.scheduling_policy == "fifo":
            # FIFO调度:按提交时间顺序分配
            sorted_apps = sorted(
                queue.applications.values(),
                key=lambda app: app.start_time or 0
            )
            
            for app in sorted_apps:
                if remaining_resource.memory_mb <= 0 or remaining_resource.vcores <= 0:
                    break
                
                app_allocation = self._allocate_to_single_app(app, remaining_resource)
                allocations.extend(app_allocation)
                
                for _, container in app_allocation:
                    remaining_resource = remaining_resource - container.resource
        
        elif queue.config.scheduling_policy == "drf":
            # DRF (Dominant Resource Fairness) 调度
            allocations = self._allocate_with_drf(queue, remaining_resource)
        
        return allocations
    
    def _allocate_to_single_app(self, app: ApplicationMaster, available_resource: Resource) -> List[Tuple[str, Container]]:
        """
        为单个应用程序分配资源
        
        Args:
            app: 应用程序
            available_resource: 可用资源
            
        Returns:
            List[Tuple[str, Container]]: 分配结果列表
        """
        allocations = []
        remaining_resource = available_resource
        
        for request in app.resource_requests:
            if remaining_resource.memory_mb <= 0 or remaining_resource.vcores <= 0:
                break
            
            resource_needed = request['resource']
            num_containers = request['num_containers']
            priority = request['priority']
            
            # 计算可以分配的容器数量
            max_containers = min(
                remaining_resource.memory_mb // resource_needed.memory_mb,
                remaining_resource.vcores // resource_needed.vcores,
                num_containers
            )
            
            # 分配容器
            for i in range(max_containers):
                container_id = f"container_{int(time.time() * 1000)}_{i}"
                container = Container(
                    container_id=container_id,
                    node_id="",
                    resource=resource_needed,
                    priority=priority
                )
                
                allocations.append((app.app_id, container))
                remaining_resource = remaining_resource - resource_needed
                request['num_containers'] -= 1
        
        # 清理已满足的请求
        app.resource_requests = [req for req in app.resource_requests if req['num_containers'] > 0]
        
        return allocations
    
    def _allocate_with_drf(self, queue: FairSchedulerQueue, available_resource: Resource) -> List[Tuple[str, Container]]:
        """
        使用DRF算法分配资源
        
        Args:
            queue: 队列
            available_resource: 可用资源
            
        Returns:
            List[Tuple[str, Container]]: 分配结果列表
        """
        allocations = []
        app_allocations = {app.app_id: Resource(0, 0) for app in queue.applications.values()}
        remaining_resource = available_resource
        
        while remaining_resource.memory_mb > 0 and remaining_resource.vcores > 0:
            # 找到主导资源份额最小的应用程序
            min_dominant_share = float('inf')
            selected_app = None
            selected_request = None
            
            for app in queue.applications.values():
                if not app.resource_requests:
                    continue
                
                for request in app.resource_requests:
                    resource_needed = request['resource']
                    
                    # 检查是否有足够资源
                    if (remaining_resource.memory_mb >= resource_needed.memory_mb and
                        remaining_resource.vcores >= resource_needed.vcores):
                        
                        # 计算分配后的主导资源份额
                        new_allocation = app_allocations[app.app_id] + resource_needed
                        dominant_share = max(
                            new_allocation.memory_mb / self.total_resource.memory_mb,
                            new_allocation.vcores / self.total_resource.vcores
                        )
                        
                        if dominant_share < min_dominant_share:
                            min_dominant_share = dominant_share
                            selected_app = app
                            selected_request = request
            
            if selected_app is None:
                break
            
            # 分配容器给选中的应用程序
            resource_needed = selected_request['resource']
            container_id = f"container_{int(time.time() * 1000)}"
            container = Container(
                container_id=container_id,
                node_id="",
                resource=resource_needed,
                priority=selected_request['priority']
            )
            
            allocations.append((selected_app.app_id, container))
            app_allocations[selected_app.app_id] = app_allocations[selected_app.app_id] + resource_needed
            remaining_resource = remaining_resource - resource_needed
            selected_request['num_containers'] -= 1
            
            # 如果请求已满足,从列表中移除
            if selected_request['num_containers'] == 0:
                selected_app.resource_requests.remove(selected_request)
        
        return allocations
    
    def check_preemption(self) -> List[Tuple[str, str, Container]]:
        """
        检查是否需要抢占资源
        
        Returns:
            List[Tuple[str, str, Container]]: 抢占列表 (from_queue, to_queue, container)
        """
        if not self.preemption_enabled:
            return []
        
        preemptions = []
        
        # 找出需要抢占资源的队列
        starved_queues = [queue for queue in self.queues.values() if queue.needs_preemption()]
        
        if not starved_queues:
            return preemptions
        
        # 按资源不足程度排序
        starved_queues.sort(key=lambda q: q.fair_share.memory_mb - q.allocated_resource.memory_mb, reverse=True)
        
        for starved_queue in starved_queues:
            # 找到可以抢占资源的队列(使用率超过公平份额的队列)
            over_allocated_queues = [
                queue for queue in self.queues.values()
                if queue.calculate_usage_ratio() > 1.0 and queue != starved_queue
            ]
            
            # 按超额使用程度排序
            over_allocated_queues.sort(key=lambda q: q.calculate_usage_ratio(), reverse=True)
            
            for over_queue in over_allocated_queues:
                # 计算需要抢占的资源量
                deficit = Resource(
                    starved_queue.fair_share.memory_mb - starved_queue.allocated_resource.memory_mb,
                    starved_queue.fair_share.vcores - starved_queue.allocated_resource.vcores
                )
                
                if deficit.memory_mb <= 0:
                    break
                
                # 找到可以抢占的容器
                preemptable_containers = []
                for app in over_queue.applications.values():
                    for container in app.allocated_containers.values():
                        preemptable_containers.append((app.app_id, container))
                
                # 按优先级排序,优先抢占低优先级容器
                preemptable_containers.sort(key=lambda x: x[1].priority)
                
                for app_id, container in preemptable_containers:
                    if deficit.memory_mb <= 0:
                        break
                    
                    preemptions.append((over_queue.config.name, starved_queue.config.name, container))
                    deficit = deficit - container.resource
        
        return preemptions
    
    def get_queue_info(self, queue_name: str) -> Optional[dict]:
        """
        获取队列信息
        
        Args:
            queue_name: 队列名称
            
        Returns:
            Optional[dict]: 队列信息
        """
        if queue_name not in self.queues:
            return None
        
        queue = self.queues[queue_name]
        
        return {
            'name': queue.config.name,
            'weight': queue.config.weight,
            'scheduling_policy': queue.config.scheduling_policy,
            'min_resources': queue.config.min_resources,
            'max_resources': queue.config.max_resources,
            'fair_share': queue.fair_share,
            'allocated_resource': queue.allocated_resource,
            'demand': queue.demand,
            'usage_ratio': queue.calculate_usage_ratio(),
            'is_starved': queue.is_starved(),
            'num_applications': len(queue.applications),
            'max_running_apps': queue.config.max_running_apps
        }
    
    def get_cluster_info(self) -> dict:
        """
        获取集群信息
        
        Returns:
            dict: 集群信息
        """
        total_allocated = Resource(0, 0)
        total_demand = Resource(0, 0)
        total_applications = 0
        
        queue_infos = {}
        
        for queue_name, queue in self.queues.items():
            queue_info = self.get_queue_info(queue_name)
            queue_infos[queue_name] = queue_info
            
            total_allocated = total_allocated + queue.allocated_resource
            total_demand = total_demand + queue.demand
            total_applications += len(queue.applications)
        
        cluster_utilization = 0.0
        if self.total_resource.memory_mb > 0:
            cluster_utilization = total_allocated.memory_mb / self.total_resource.memory_mb
        
        return {
            'total_resource': self.total_resource,
            'allocated_resource': total_allocated,
            'available_resource': self.total_resource - total_allocated,
            'total_demand': total_demand,
            'utilization': cluster_utilization,
            'num_applications': total_applications,
            'queues': queue_infos
        }

# 使用示例
if __name__ == "__main__":
    # 创建公平调度器
    total_resource = Resource(32768, 32)  # 32GB内存,32核CPU
    scheduler = FairScheduler(total_resource)
    
    # 创建队列配置
    default_config = FairShareConfig(
        name="default",
        weight=1.0,
        min_resources=Resource(4096, 4),  # 最小4GB内存,4核CPU
        max_resources=Resource(16384, 16),  # 最大16GB内存,16核CPU
        scheduling_policy="fair"
    )
    
    production_config = FairShareConfig(
        name="production",
        weight=3.0,  # 更高权重
        min_resources=Resource(8192, 8),  # 最小8GB内存,8核CPU
        max_resources=Resource(24576, 24),  # 最大24GB内存,24核CPU
        scheduling_policy="drf"
    )
    
    development_config = FairShareConfig(
        name="development",
        weight=0.5,  # 较低权重
        min_resources=Resource(2048, 2),  # 最小2GB内存,2核CPU
        max_resources=Resource(8192, 8),  # 最大8GB内存,8核CPU
        scheduling_policy="fifo"
    )
    
    # 添加队列到调度器
    scheduler.add_queue(default_config)
    scheduler.add_queue(production_config)
    scheduler.add_queue(development_config)
    
    # 创建应用程序
    app1 = ApplicationMaster("app_001", "MapReduce")
    app2 = ApplicationMaster("app_002", "Spark")
    app3 = ApplicationMaster("app_003", "Flink")
    app4 = ApplicationMaster("app_004", "Storm")
    
    # 提交应用程序到不同队列
    scheduler.submit_application(app1, "production", "user1")
    scheduler.submit_application(app2, "production", "user2")
    scheduler.submit_application(app3, "default", "user3")
    scheduler.submit_application(app4, "development", "user4")
    
    # 应用程序请求资源
    app1.submit_resource_request(Resource(2048, 2), 6, priority=3)  # 高优先级
    app2.submit_resource_request(Resource(4096, 4), 3, priority=2)  # 中优先级
    app3.submit_resource_request(Resource(1024, 1), 8, priority=2)  # 中优先级
    app4.submit_resource_request(Resource(512, 1), 4, priority=1)   # 低优先级
    
    # 模拟资源分配
    available_resource = Resource(20480, 20)  # 约60%资源可用
    allocations = scheduler.allocate_resources(available_resource)
    
    print("=== 公平调度器分配结果 ===")
    for queue_name, queue_allocations in allocations.items():
        print(f"队列 {queue_name}: 分配了 {len(queue_allocations)} 个容器")
        for app_id, container in queue_allocations:
            print(f"  应用 {app_id}: 容器 {container.container_id}, 资源 {container.resource.memory_mb}MB/{container.resource.vcores}核")
    
    # 获取集群信息
    cluster_info = scheduler.get_cluster_info()
    print("\n=== 集群状态 ===")
    print(f"总资源: {cluster_info['total_resource'].memory_mb}MB/{cluster_info['total_resource'].vcores}核")
    print(f"已分配: {cluster_info['allocated_resource'].memory_mb}MB/{cluster_info['allocated_resource'].vcores}核")
    print(f"总需求: {cluster_info['total_demand'].memory_mb}MB/{cluster_info['total_demand'].vcores}核")
    print(f"利用率: {cluster_info['utilization']:.1%}")
    print(f"应用数: {cluster_info['num_applications']}")
    
    print("\n=== 队列状态 ===")
    for queue_name, queue_info in cluster_info['queues'].items():
        print(f"队列 {queue_name}:")
        print(f"  权重: {queue_info['weight']}")
        print(f"  调度策略: {queue_info['scheduling_policy']}")
        print(f"  公平份额: {queue_info['fair_share'].memory_mb}MB/{queue_info['fair_share'].vcores}核")
        print(f"  已分配: {queue_info['allocated_resource'].memory_mb}MB/{queue_info['allocated_resource'].vcores}核")
        print(f"  使用率比例: {queue_info['usage_ratio']:.2f}")
        print(f"  是否资源不足: {queue_info['is_starved']}")
        print(f"  应用数: {queue_info['num_applications']}")
    
    # 检查抢占
    preemptions = scheduler.check_preemption()
    if preemptions:
        print("\n=== 抢占建议 ===")
        for from_queue, to_queue, container in preemptions:
            print(f"从队列 {from_queue} 抢占容器 {container.container_id} 给队列 {to_queue}")

4. YARN应用生命周期管理

4.1 应用程序状态管理

from enum import Enum
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, field
import time
import threading
from datetime import datetime

class ApplicationState(Enum):
    """应用程序状态枚举"""
    NEW = "NEW"                    # 新建
    NEW_SAVING = "NEW_SAVING"      # 保存中
    SUBMITTED = "SUBMITTED"        # 已提交
    ACCEPTED = "ACCEPTED"          # 已接受
    RUNNING = "RUNNING"            # 运行中
    FINISHED = "FINISHED"          # 已完成
    FAILED = "FAILED"              # 失败
    KILLED = "KILLED"              # 被杀死

class ApplicationAttemptState(Enum):
    """应用程序尝试状态枚举"""
    NEW = "NEW"                    # 新建
    SUBMITTED = "SUBMITTED"        # 已提交
    SCHEDULED = "SCHEDULED"        # 已调度
    ALLOCATED_SAVING = "ALLOCATED_SAVING"  # 分配保存中
    ALLOCATED = "ALLOCATED"        # 已分配
    LAUNCHED = "LAUNCHED"          # 已启动
    RUNNING = "RUNNING"            # 运行中
    FINISHING = "FINISHING"        # 完成中
    FINISHED = "FINISHED"          # 已完成
    FAILED = "FAILED"              # 失败
    KILLED = "KILLED"              # 被杀死

@dataclass
class ApplicationReport:
    """应用程序报告"""
    application_id: str
    application_type: str
    name: str
    queue: str
    user: str
    host: str
    rpc_port: int
    client_to_am_token: str
    state: ApplicationState
    diagnostics: str
    tracking_url: str
    start_time: int
    finish_time: int
    final_status: str
    progress: float
    application_tags: Set[str] = field(default_factory=set)
    
class ApplicationLifecycleManager:
    """
    YARN应用程序生命周期管理器
    """
    
    def __init__(self):
        self.applications: Dict[str, ApplicationReport] = {}
        self.application_attempts: Dict[str, List[str]] = {}  # app_id -> attempt_ids
        self.attempt_states: Dict[str, ApplicationAttemptState] = {}
        self.state_transitions: Dict[ApplicationState, Set[ApplicationState]] = {
            ApplicationState.NEW: {ApplicationState.NEW_SAVING, ApplicationState.FAILED, ApplicationState.KILLED},
            ApplicationState.NEW_SAVING: {ApplicationState.SUBMITTED, ApplicationState.FAILED, ApplicationState.KILLED},
            ApplicationState.SUBMITTED: {ApplicationState.ACCEPTED, ApplicationState.FAILED, ApplicationState.KILLED},
            ApplicationState.ACCEPTED: {ApplicationState.RUNNING, ApplicationState.FAILED, ApplicationState.KILLED},
            ApplicationState.RUNNING: {ApplicationState.FINISHED, ApplicationState.FAILED, ApplicationState.KILLED},
            ApplicationState.FINISHED: set(),
            ApplicationState.FAILED: set(),
            ApplicationState.KILLED: set()
        }
        self.max_attempts = 2  # 最大尝试次数
        self.attempt_interval = 10000  # 重试间隔(毫秒)
        self.lock = threading.Lock()
    
    def submit_application(self, app_id: str, app_type: str, name: str, 
                          queue: str, user: str, tags: Set[str] = None) -> bool:
        """
        提交应用程序
        
        Args:
            app_id: 应用程序ID
            app_type: 应用程序类型
            name: 应用程序名称
            queue: 队列名称
            user: 用户名
            tags: 应用程序标签
            
        Returns:
            bool: 是否提交成功
        """
        with self.lock:
            if app_id in self.applications:
                return False
            
            app_report = ApplicationReport(
                application_id=app_id,
                application_type=app_type,
                name=name,
                queue=queue,
                user=user,
                host="",
                rpc_port=0,
                client_to_am_token="",
                state=ApplicationState.NEW,
                diagnostics="",
                tracking_url="",
                start_time=int(time.time() * 1000),
                finish_time=0,
                final_status="UNDEFINED",
                progress=0.0,
                application_tags=tags or set()
            )
            
            self.applications[app_id] = app_report
            self.application_attempts[app_id] = []
            
            # 状态转换到NEW_SAVING
            self._transition_state(app_id, ApplicationState.NEW_SAVING)
            
            return True
    
    def accept_application(self, app_id: str) -> bool:
        """
        接受应用程序
        
        Args:
            app_id: 应用程序ID
            
        Returns:
            bool: 是否接受成功
        """
        with self.lock:
            if app_id not in self.applications:
                return False
            
            app = self.applications[app_id]
            
            # 检查状态转换是否合法
            if not self._can_transition(app.state, ApplicationState.ACCEPTED):
                return False
            
            # 状态转换
            self._transition_state(app_id, ApplicationState.SUBMITTED)
            self._transition_state(app_id, ApplicationState.ACCEPTED)
            
            # 创建第一个应用程序尝试
            attempt_id = f"{app_id}_000001"
            self.application_attempts[app_id].append(attempt_id)
            self.attempt_states[attempt_id] = ApplicationAttemptState.NEW
            
            return True
    
    def start_application_attempt(self, app_id: str, am_host: str, am_port: int, 
                                 tracking_url: str = "") -> Optional[str]:
        """
        启动应用程序尝试
        
        Args:
            app_id: 应用程序ID
            am_host: ApplicationMaster主机
            am_port: ApplicationMaster端口
            tracking_url: 跟踪URL
            
        Returns:
            Optional[str]: 尝试ID,如果失败返回None
        """
        with self.lock:
            if app_id not in self.applications:
                return None
            
            app = self.applications[app_id]
            attempts = self.application_attempts[app_id]
            
            if not attempts:
                return None
            
            current_attempt = attempts[-1]
            
            # 更新应用程序信息
            app.host = am_host
            app.rpc_port = am_port
            app.tracking_url = tracking_url
            
            # 状态转换
            self._transition_state(app_id, ApplicationState.RUNNING)
            self.attempt_states[current_attempt] = ApplicationAttemptState.RUNNING
            
            return current_attempt
    
    def finish_application(self, app_id: str, final_status: str, 
                          diagnostics: str = "") -> bool:
        """
        完成应用程序
        
        Args:
            app_id: 应用程序ID
            final_status: 最终状态
            diagnostics: 诊断信息
            
        Returns:
            bool: 是否完成成功
        """
        with self.lock:
            if app_id not in self.applications:
                return False
            
            app = self.applications[app_id]
            
            # 检查状态转换是否合法
            target_state = ApplicationState.FINISHED if final_status == "SUCCEEDED" else ApplicationState.FAILED
            
            if not self._can_transition(app.state, target_state):
                return False
            
            # 更新应用程序信息
            app.final_status = final_status
            app.diagnostics = diagnostics
            app.finish_time = int(time.time() * 1000)
            app.progress = 1.0
            
            # 状态转换
            self._transition_state(app_id, target_state)
            
            # 更新当前尝试状态
            attempts = self.application_attempts[app_id]
            if attempts:
                current_attempt = attempts[-1]
                self.attempt_states[current_attempt] = (
                    ApplicationAttemptState.FINISHED if target_state == ApplicationState.FINISHED 
                    else ApplicationAttemptState.FAILED
                )
            
            return True
    
    def kill_application(self, app_id: str, diagnostics: str = "") -> bool:
        """
        杀死应用程序
        
        Args:
            app_id: 应用程序ID
            diagnostics: 诊断信息
            
        Returns:
            bool: 是否杀死成功
        """
        with self.lock:
            if app_id not in self.applications:
                return False
            
            app = self.applications[app_id]
            
            # 检查状态转换是否合法
            if not self._can_transition(app.state, ApplicationState.KILLED):
                return False
            
            # 更新应用程序信息
            app.final_status = "KILLED"
            app.diagnostics = diagnostics
            app.finish_time = int(time.time() * 1000)
            
            # 状态转换
            self._transition_state(app_id, ApplicationState.KILLED)
            
            # 更新当前尝试状态
            attempts = self.application_attempts[app_id]
            if attempts:
                current_attempt = attempts[-1]
                self.attempt_states[current_attempt] = ApplicationAttemptState.KILLED
            
            return True
    
    def retry_application(self, app_id: str, failure_reason: str) -> Optional[str]:
        """
        重试应用程序
        
        Args:
            app_id: 应用程序ID
            failure_reason: 失败原因
            
        Returns:
            Optional[str]: 新的尝试ID,如果不能重试返回None
        """
        with self.lock:
            if app_id not in self.applications:
                return None
            
            attempts = self.application_attempts[app_id]
            
            # 检查是否还能重试
            if len(attempts) >= self.max_attempts:
                # 达到最大重试次数,标记应用程序为失败
                self.finish_application(app_id, "FAILED", 
                                       f"Maximum attempts ({self.max_attempts}) reached. Last failure: {failure_reason}")
                return None
            
            # 创建新的尝试
            attempt_number = len(attempts) + 1
            new_attempt_id = f"{app_id}_{attempt_number:06d}"
            
            attempts.append(new_attempt_id)
            self.attempt_states[new_attempt_id] = ApplicationAttemptState.NEW
            
            # 更新当前尝试状态为失败
            if attempts:
                previous_attempt = attempts[-2]
                self.attempt_states[previous_attempt] = ApplicationAttemptState.FAILED
            
            # 重置应用程序状态为ACCEPTED,准备重新运行
            self._transition_state(app_id, ApplicationState.ACCEPTED)
            
            return new_attempt_id
    
    def update_application_progress(self, app_id: str, progress: float) -> bool:
        """
        更新应用程序进度
        
        Args:
            app_id: 应用程序ID
            progress: 进度(0.0-1.0)
            
        Returns:
            bool: 是否更新成功
        """
        with self.lock:
            if app_id not in self.applications:
                return False
            
            app = self.applications[app_id]
            app.progress = max(0.0, min(1.0, progress))
            
            return True
    
    def get_application_report(self, app_id: str) -> Optional[ApplicationReport]:
        """
        获取应用程序报告
        
        Args:
            app_id: 应用程序ID
            
        Returns:
            Optional[ApplicationReport]: 应用程序报告
        """
        with self.lock:
            return self.applications.get(app_id)
    
    def get_applications(self, states: Set[ApplicationState] = None, 
                        users: Set[str] = None, queues: Set[str] = None,
                        application_types: Set[str] = None) -> List[ApplicationReport]:
        """
        获取应用程序列表
        
        Args:
            states: 状态过滤
            users: 用户过滤
            queues: 队列过滤
            application_types: 应用程序类型过滤
            
        Returns:
            List[ApplicationReport]: 应用程序报告列表
        """
        with self.lock:
            result = []
            
            for app in self.applications.values():
                # 状态过滤
                if states and app.state not in states:
                    continue
                
                # 用户过滤
                if users and app.user not in users:
                    continue
                
                # 队列过滤
                if queues and app.queue not in queues:
                    continue
                
                # 应用程序类型过滤
                if application_types and app.application_type not in application_types:
                    continue
                
                result.append(app)
            
            return result
    
    def get_application_attempts(self, app_id: str) -> List[dict]:
        """
        获取应用程序尝试列表
        
        Args:
            app_id: 应用程序ID
            
        Returns:
            List[dict]: 尝试信息列表
        """
        with self.lock:
            if app_id not in self.application_attempts:
                return []
            
            attempts = []
            for attempt_id in self.application_attempts[app_id]:
                attempt_info = {
                    'attempt_id': attempt_id,
                    'state': self.attempt_states.get(attempt_id, ApplicationAttemptState.NEW),
                    'start_time': int(time.time() * 1000),  # 简化实现
                    'finish_time': 0,
                    'diagnostics': ''
                }
                attempts.append(attempt_info)
            
            return attempts
    
    def _can_transition(self, from_state: ApplicationState, to_state: ApplicationState) -> bool:
        """
        检查状态转换是否合法
        
        Args:
            from_state: 源状态
            to_state: 目标状态
            
        Returns:
            bool: 是否可以转换
        """
        return to_state in self.state_transitions.get(from_state, set())
    
    def _transition_state(self, app_id: str, new_state: ApplicationState):
        """
        执行状态转换
        
        Args:
            app_id: 应用程序ID
            new_state: 新状态
        """
        if app_id in self.applications:
            old_state = self.applications[app_id].state
            self.applications[app_id].state = new_state
            print(f"应用程序 {app_id} 状态从 {old_state.value} 转换到 {new_state.value}")
    
    def get_cluster_metrics(self) -> dict:
        """
        获取集群指标
        
        Returns:
            dict: 集群指标
        """
        with self.lock:
            metrics = {
                'total_applications': len(self.applications),
                'applications_by_state': {},
                'applications_by_type': {},
                'applications_by_queue': {},
                'applications_by_user': {},
                'average_runtime': 0.0,
                'success_rate': 0.0
            }
            
            # 按状态统计
            for app in self.applications.values():
                state = app.state.value
                metrics['applications_by_state'][state] = metrics['applications_by_state'].get(state, 0) + 1
                
                # 按类型统计
                app_type = app.application_type
                metrics['applications_by_type'][app_type] = metrics['applications_by_type'].get(app_type, 0) + 1
                
                # 按队列统计
                queue = app.queue
                metrics['applications_by_queue'][queue] = metrics['applications_by_queue'].get(queue, 0) + 1
                
                # 按用户统计
                user = app.user
                metrics['applications_by_user'][user] = metrics['applications_by_user'].get(user, 0) + 1
            
            # 计算平均运行时间和成功率
            finished_apps = [app for app in self.applications.values() 
                           if app.state in {ApplicationState.FINISHED, ApplicationState.FAILED, ApplicationState.KILLED}]
            
            if finished_apps:
                total_runtime = sum(app.finish_time - app.start_time for app in finished_apps if app.finish_time > 0)
                metrics['average_runtime'] = total_runtime / len(finished_apps)
                
                successful_apps = [app for app in finished_apps if app.state == ApplicationState.FINISHED]
                metrics['success_rate'] = len(successful_apps) / len(finished_apps)
            
            return metrics

# 使用示例
if __name__ == "__main__":
    # 创建应用程序生命周期管理器
    lifecycle_manager = ApplicationLifecycleManager()
    
    # 提交应用程序
    app_id = "application_1234567890123_0001"
    success = lifecycle_manager.submit_application(
        app_id=app_id,
        app_type="MapReduce",
        name="WordCount Job",
        queue="default",
        user="hadoop",
        tags={"project", "batch"}
    )
    
    print(f"应用程序提交: {'成功' if success else '失败'}")
    
    # 接受应用程序
    success = lifecycle_manager.accept_application(app_id)
    print(f"应用程序接受: {'成功' if success else '失败'}")
    
    # 启动应用程序尝试
    attempt_id = lifecycle_manager.start_application_attempt(
        app_id=app_id,
        am_host="node1.cluster.com",
        am_port=8088,
        tracking_url="http://node1.cluster.com:8088/proxy/application_1234567890123_0001/"
    )
    
    print(f"应用程序尝试启动: {attempt_id}")
    
    # 更新进度
    for progress in [0.1, 0.3, 0.5, 0.7, 0.9]:
        lifecycle_manager.update_application_progress(app_id, progress)
        print(f"应用程序进度更新: {progress:.1%}")
        time.sleep(1)
    
    # 完成应用程序
    success = lifecycle_manager.finish_application(
        app_id=app_id,
        final_status="SUCCEEDED",
        diagnostics="Job completed successfully"
    )
    
    print(f"应用程序完成: {'成功' if success else '失败'}")
    
    # 获取应用程序报告
    report = lifecycle_manager.get_application_report(app_id)
    if report:
        print(f"\n=== 应用程序报告 ===")
        print(f"ID: {report.application_id}")
        print(f"名称: {report.name}")
        print(f"类型: {report.application_type}")
        print(f"状态: {report.state.value}")
        print(f"最终状态: {report.final_status}")
        print(f"进度: {report.progress:.1%}")
        print(f"用户: {report.user}")
        print(f"队列: {report.queue}")
        print(f"开始时间: {datetime.fromtimestamp(report.start_time/1000)}")
        print(f"结束时间: {datetime.fromtimestamp(report.finish_time/1000)}")
        print(f"运行时间: {(report.finish_time - report.start_time)/1000:.2f}秒")
        print(f"跟踪URL: {report.tracking_url}")
        print(f"标签: {', '.join(report.application_tags)}")
    
    # 获取集群指标
    metrics = lifecycle_manager.get_cluster_metrics()
    print(f"\n=== 集群指标 ===")
    print(f"总应用数: {metrics['total_applications']}")
    print(f"按状态分布: {metrics['applications_by_state']}")
    print(f"按类型分布: {metrics['applications_by_type']}")
    print(f"按队列分布: {metrics['applications_by_queue']}")
    print(f"按用户分布: {metrics['applications_by_user']}")
    print(f"平均运行时间: {metrics['average_runtime']/1000:.2f}秒")
    print(f"成功率: {metrics['success_rate']:.1%}")

4.2 容器生命周期管理

from enum import Enum
from typing import Dict, List, Optional, Set
from dataclasses import dataclass
import time
import threading
from datetime import datetime

class ContainerState(Enum):
    """容器状态枚举"""
    NEW = "NEW"                    # 新建
    LOCALIZING = "LOCALIZING"      # 本地化中
    LOCALIZATION_FAILED = "LOCALIZATION_FAILED"  # 本地化失败
    LOCALIZED = "LOCALIZED"        # 已本地化
    RUNNING = "RUNNING"            # 运行中
    EXITED_WITH_SUCCESS = "EXITED_WITH_SUCCESS"  # 成功退出
    EXITED_WITH_FAILURE = "EXITED_WITH_FAILURE"  # 失败退出
    KILLING = "KILLING"            # 杀死中
    CONTAINER_CLEANEDUP_AFTER_KILL = "CONTAINER_CLEANEDUP_AFTER_KILL"  # 杀死后清理
    CONTAINER_RESOURCES_CLEANINGUP = "CONTAINER_RESOURCES_CLEANINGUP"  # 资源清理中
    DONE = "DONE"                  # 完成

class ContainerExitStatus(Enum):
    """容器退出状态枚举"""
    SUCCESS = 0
    INVALID = -1000
    ABORTED = -100
    PREEMPTED = -102
    DISKS_FAILED = -101
    KILLED_EXCEEDED_VMEM = -103
    KILLED_EXCEEDED_PMEM = -104
    KILLED_BY_RESOURCEMANAGER = -105
    KILLED_BY_APPMASTER = -106
    KILLED_AFTER_APP_COMPLETION = -107

@dataclass
class ContainerStatus:
    """容器状态信息"""
    container_id: str
    state: ContainerState
    diagnostics: str
    exit_status: int
    
class ContainerLifecycleManager:
    """
    YARN容器生命周期管理器
    """
    
    def __init__(self, node_id: str):
        self.node_id = node_id
        self.containers: Dict[str, Container] = {}
        self.container_states: Dict[str, ContainerState] = {}
        self.container_processes: Dict[str, dict] = {}  # 模拟进程信息
        self.container_resources: Dict[str, Resource] = {}
        self.container_start_times: Dict[str, int] = {}
        self.container_finish_times: Dict[str, int] = {}
        self.container_exit_codes: Dict[str, int] = {}
        self.container_diagnostics: Dict[str, str] = {}
        
        # 状态转换规则
        self.state_transitions = {
            ContainerState.NEW: {ContainerState.LOCALIZING, ContainerState.KILLING},
            ContainerState.LOCALIZING: {ContainerState.LOCALIZED, ContainerState.LOCALIZATION_FAILED, ContainerState.KILLING},
            ContainerState.LOCALIZATION_FAILED: {ContainerState.DONE},
            ContainerState.LOCALIZED: {ContainerState.RUNNING, ContainerState.KILLING},
            ContainerState.RUNNING: {ContainerState.EXITED_WITH_SUCCESS, ContainerState.EXITED_WITH_FAILURE, ContainerState.KILLING},
            ContainerState.EXITED_WITH_SUCCESS: {ContainerState.CONTAINER_RESOURCES_CLEANINGUP},
            ContainerState.EXITED_WITH_FAILURE: {ContainerState.CONTAINER_RESOURCES_CLEANINGUP},
            ContainerState.KILLING: {ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL},
            ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL: {ContainerState.CONTAINER_RESOURCES_CLEANINGUP},
            ContainerState.CONTAINER_RESOURCES_CLEANINGUP: {ContainerState.DONE},
            ContainerState.DONE: set()
        }
        
        self.lock = threading.Lock()
        self.resource_monitor = ResourceMonitor()
    
    def allocate_container(self, container: Container) -> bool:
        """
        分配容器
        
        Args:
            container: 容器对象
            
        Returns:
            bool: 是否分配成功
        """
        with self.lock:
            if container.container_id in self.containers:
                return False
            
            # 检查资源是否足够
            if not self._check_resource_availability(container.resource):
                return False
            
            # 分配容器
            self.containers[container.container_id] = container
            self.container_states[container.container_id] = ContainerState.NEW
            self.container_resources[container.container_id] = container.resource
            self.container_start_times[container.container_id] = int(time.time() * 1000)
            self.container_diagnostics[container.container_id] = ""
            
            print(f"容器 {container.container_id} 已分配到节点 {self.node_id}")
            return True
    
    def start_container(self, container_id: str, launch_context: dict) -> bool:
        """
        启动容器
        
        Args:
            container_id: 容器ID
            launch_context: 启动上下文
            
        Returns:
            bool: 是否启动成功
        """
        with self.lock:
            if container_id not in self.containers:
                return False
            
            current_state = self.container_states[container_id]
            
            # 检查状态转换是否合法
            if not self._can_transition(current_state, ContainerState.LOCALIZING):
                return False
            
            # 开始本地化过程
            self._transition_state(container_id, ContainerState.LOCALIZING)
            
            # 模拟本地化过程
            success = self._localize_resources(container_id, launch_context)
            
            if success:
                self._transition_state(container_id, ContainerState.LOCALIZED)
                
                # 启动容器进程
                success = self._launch_container_process(container_id, launch_context)
                
                if success:
                    self._transition_state(container_id, ContainerState.RUNNING)
                    print(f"容器 {container_id} 启动成功")
                    return True
                else:
                    self._transition_state(container_id, ContainerState.EXITED_WITH_FAILURE)
                    self.container_exit_codes[container_id] = ContainerExitStatus.INVALID.value
                    self.container_diagnostics[container_id] = "Failed to launch container process"
            else:
                self._transition_state(container_id, ContainerState.LOCALIZATION_FAILED)
                self.container_exit_codes[container_id] = ContainerExitStatus.INVALID.value
                self.container_diagnostics[container_id] = "Resource localization failed"
            
            return False
    
    def stop_container(self, container_id: str, reason: str = "") -> bool:
        """
        停止容器
        
        Args:
            container_id: 容器ID
            reason: 停止原因
            
        Returns:
            bool: 是否停止成功
        """
        with self.lock:
            if container_id not in self.containers:
                return False
            
            current_state = self.container_states[container_id]
            
            # 检查是否可以杀死
            if not self._can_transition(current_state, ContainerState.KILLING):
                return False
            
            # 转换到杀死状态
            self._transition_state(container_id, ContainerState.KILLING)
            
            # 杀死容器进程
            self._kill_container_process(container_id)
            
            # 清理容器
            self._cleanup_container(container_id)
            
            # 更新退出信息
            self.container_exit_codes[container_id] = ContainerExitStatus.KILLED_BY_RESOURCEMANAGER.value
            self.container_diagnostics[container_id] = reason or "Container killed by ResourceManager"
            self.container_finish_times[container_id] = int(time.time() * 1000)
            
            print(f"容器 {container_id} 已停止: {reason}")
            return True
    
    def complete_container(self, container_id: str, exit_code: int, diagnostics: str = "") -> bool:
        """
        完成容器
        
        Args:
            container_id: 容器ID
            exit_code: 退出码
            diagnostics: 诊断信息
            
        Returns:
            bool: 是否完成成功
        """
        with self.lock:
            if container_id not in self.containers:
                return False
            
            current_state = self.container_states[container_id]
            
            if current_state != ContainerState.RUNNING:
                return False
            
            # 根据退出码确定状态
            if exit_code == ContainerExitStatus.SUCCESS.value:
                target_state = ContainerState.EXITED_WITH_SUCCESS
            else:
                target_state = ContainerState.EXITED_WITH_FAILURE
            
            # 状态转换
            self._transition_state(container_id, target_state)
            
            # 更新退出信息
            self.container_exit_codes[container_id] = exit_code
            self.container_diagnostics[container_id] = diagnostics
            self.container_finish_times[container_id] = int(time.time() * 1000)
            
            # 开始资源清理
            self._cleanup_container(container_id)
            
            print(f"容器 {container_id} 完成,退出码: {exit_code}")
            return True
    
    def get_container_status(self, container_id: str) -> Optional[ContainerStatus]:
        """
        获取容器状态
        
        Args:
            container_id: 容器ID
            
        Returns:
            Optional[ContainerStatus]: 容器状态
        """
        with self.lock:
            if container_id not in self.containers:
                return None
            
            return ContainerStatus(
                container_id=container_id,
                state=self.container_states[container_id],
                diagnostics=self.container_diagnostics.get(container_id, ""),
                exit_status=self.container_exit_codes.get(container_id, -1)
            )
    
    def get_container_report(self, container_id: str) -> Optional[dict]:
        """
        获取容器详细报告
        
        Args:
            container_id: 容器ID
            
        Returns:
            Optional[dict]: 容器报告
        """
        with self.lock:
            if container_id not in self.containers:
                return None
            
            container = self.containers[container_id]
            start_time = self.container_start_times.get(container_id, 0)
            finish_time = self.container_finish_times.get(container_id, 0)
            
            # 计算运行时间
            if finish_time > 0:
                runtime = finish_time - start_time
            else:
                runtime = int(time.time() * 1000) - start_time
            
            # 获取资源使用情况
            resource_usage = self.resource_monitor.get_container_usage(container_id)
            
            return {
                'container_id': container_id,
                'node_id': self.node_id,
                'state': self.container_states[container_id].value,
                'allocated_resource': {
                    'memory_mb': container.resource.memory_mb,
                    'vcores': container.resource.vcores
                },
                'used_resource': resource_usage,
                'priority': container.priority,
                'start_time': start_time,
                'finish_time': finish_time,
                'runtime_ms': runtime,
                'exit_code': self.container_exit_codes.get(container_id, -1),
                'diagnostics': self.container_diagnostics.get(container_id, ""),
                'process_info': self.container_processes.get(container_id, {})
            }
    
    def get_all_containers(self) -> List[dict]:
        """
        获取所有容器信息
        
        Returns:
            List[dict]: 容器信息列表
        """
        with self.lock:
            containers = []
            for container_id in self.containers.keys():
                report = self.get_container_report(container_id)
                if report:
                    containers.append(report)
            return containers
    
    def get_node_resource_usage(self) -> dict:
        """
        获取节点资源使用情况
        
        Returns:
            dict: 节点资源使用情况
        """
        with self.lock:
            total_allocated = Resource(0, 0)
            running_containers = 0
            
            for container_id, state in self.container_states.items():
                if state == ContainerState.RUNNING:
                    running_containers += 1
                    resource = self.container_resources.get(container_id, Resource(0, 0))
                    total_allocated = total_allocated + resource
            
            return {
                'node_id': self.node_id,
                'total_containers': len(self.containers),
                'running_containers': running_containers,
                'allocated_memory_mb': total_allocated.memory_mb,
                'allocated_vcores': total_allocated.vcores,
                'containers_by_state': self._get_containers_by_state()
            }
    
    def _check_resource_availability(self, required_resource: Resource) -> bool:
        """
        检查资源可用性
        
        Args:
            required_resource: 所需资源
            
        Returns:
            bool: 资源是否可用
        """
        # 简化实现,假设节点有足够资源
        return True
    
    def _localize_resources(self, container_id: str, launch_context: dict) -> bool:
        """
        本地化资源
        
        Args:
            container_id: 容器ID
            launch_context: 启动上下文
            
        Returns:
            bool: 本地化是否成功
        """
        # 模拟资源本地化过程
        time.sleep(0.1)  # 模拟本地化时间
        
        # 简化实现,假设本地化总是成功
        print(f"容器 {container_id} 资源本地化完成")
        return True
    
    def _launch_container_process(self, container_id: str, launch_context: dict) -> bool:
        """
        启动容器进程
        
        Args:
            container_id: 容器ID
            launch_context: 启动上下文
            
        Returns:
            bool: 启动是否成功
        """
        # 模拟进程启动
        process_info = {
            'pid': hash(container_id) % 65536,  # 模拟进程ID
            'command': launch_context.get('command', []),
            'environment': launch_context.get('environment', {}),
            'working_directory': launch_context.get('working_directory', '/tmp')
        }
        
        self.container_processes[container_id] = process_info
        
        # 启动资源监控
        self.resource_monitor.start_monitoring(container_id)
        
        print(f"容器 {container_id} 进程启动,PID: {process_info['pid']}")
        return True
    
    def _kill_container_process(self, container_id: str):
        """
        杀死容器进程
        
        Args:
            container_id: 容器ID
        """
        if container_id in self.container_processes:
            process_info = self.container_processes[container_id]
            print(f"杀死容器 {container_id} 进程,PID: {process_info['pid']}")
            
            # 停止资源监控
            self.resource_monitor.stop_monitoring(container_id)
    
    def _cleanup_container(self, container_id: str):
        """
        清理容器
        
        Args:
            container_id: 容器ID
        """
        # 状态转换到资源清理
        self._transition_state(container_id, ContainerState.CONTAINER_RESOURCES_CLEANINGUP)
        
        # 模拟清理过程
        time.sleep(0.05)
        
        # 最终状态
        self._transition_state(container_id, ContainerState.DONE)
        
        print(f"容器 {container_id} 清理完成")
    
    def _can_transition(self, from_state: ContainerState, to_state: ContainerState) -> bool:
        """
        检查状态转换是否合法
        
        Args:
            from_state: 源状态
            to_state: 目标状态
            
        Returns:
            bool: 是否可以转换
        """
        return to_state in self.state_transitions.get(from_state, set())
    
    def _transition_state(self, container_id: str, new_state: ContainerState):
        """
        执行状态转换
        
        Args:
            container_id: 容器ID
            new_state: 新状态
        """
        if container_id in self.container_states:
            old_state = self.container_states[container_id]
            self.container_states[container_id] = new_state
            print(f"容器 {container_id} 状态从 {old_state.value} 转换到 {new_state.value}")
    
    def _get_containers_by_state(self) -> dict:
        """
        按状态统计容器数量
        
        Returns:
            dict: 状态统计
        """
        state_counts = {}
        for state in self.container_states.values():
            state_counts[state.value] = state_counts.get(state.value, 0) + 1
        return state_counts

class ResourceMonitor:
    """
    资源监控器
    """
    
    def __init__(self):
        self.monitoring_containers: Set[str] = set()
        self.resource_usage: Dict[str, dict] = {}
    
    def start_monitoring(self, container_id: str):
        """
        开始监控容器资源使用
        
        Args:
            container_id: 容器ID
        """
        self.monitoring_containers.add(container_id)
        self.resource_usage[container_id] = {
            'memory_mb': 0,
            'vcores': 0.0,
            'disk_gb': 0.0,
            'network_mb': 0.0
        }
    
    def stop_monitoring(self, container_id: str):
        """
        停止监控容器资源使用
        
        Args:
            container_id: 容器ID
        """
        self.monitoring_containers.discard(container_id)
    
    def get_container_usage(self, container_id: str) -> dict:
        """
        获取容器资源使用情况
        
        Args:
            container_id: 容器ID
            
        Returns:
            dict: 资源使用情况
        """
        # 模拟资源使用数据
        if container_id in self.monitoring_containers:
            # 模拟动态资源使用
            import random
            return {
                'memory_mb': random.randint(512, 2048),
                'vcores': random.uniform(0.1, 2.0),
                'disk_gb': random.uniform(0.1, 5.0),
                'network_mb': random.uniform(0.0, 100.0)
            }
        
        return self.resource_usage.get(container_id, {
            'memory_mb': 0,
            'vcores': 0.0,
            'disk_gb': 0.0,
            'network_mb': 0.0
        })

# 使用示例
if __name__ == "__main__":
    # 创建容器生命周期管理器
    container_manager = ContainerLifecycleManager("node1.cluster.com")
    
    # 创建容器
    container = Container(
        container_id="container_1234567890123_0001_01_000001",
        node_id="node1.cluster.com",
        resource=Resource(2048, 2),
        priority=1
    )
    
    # 分配容器
    success = container_manager.allocate_container(container)
    print(f"容器分配: {'成功' if success else '失败'}")
    
    # 启动容器
    launch_context = {
        'command': ['java', '-jar', 'app.jar'],
        'environment': {'JAVA_HOME': '/usr/lib/jvm/java-8-openjdk'},
        'working_directory': '/tmp/container_workspace'
    }
    
    success = container_manager.start_container(container.container_id, launch_context)
    print(f"容器启动: {'成功' if success else '失败'}")
    
    # 模拟容器运行
    time.sleep(2)
    
    # 获取容器状态
    status = container_manager.get_container_status(container.container_id)
    if status:
        print(f"\n=== 容器状态 ===")
        print(f"容器ID: {status.container_id}")
        print(f"状态: {status.state.value}")
        print(f"诊断信息: {status.diagnostics}")
        print(f"退出状态: {status.exit_status}")
    
    # 获取容器详细报告
    report = container_manager.get_container_report(container.container_id)
    if report:
        print(f"\n=== 容器报告 ===")
        print(f"容器ID: {report['container_id']}")
        print(f"节点ID: {report['node_id']}")
        print(f"状态: {report['state']}")
        print(f"分配资源: {report['allocated_resource']['memory_mb']}MB/{report['allocated_resource']['vcores']}核")
        print(f"使用资源: {report['used_resource']['memory_mb']}MB/{report['used_resource']['vcores']:.1f}核")
        print(f"优先级: {report['priority']}")
        print(f"运行时间: {report['runtime_ms']/1000:.2f}秒")
        print(f"进程信息: PID {report['process_info']['pid']}")
    
    # 完成容器
    success = container_manager.complete_container(
        container.container_id,
        ContainerExitStatus.SUCCESS.value,
        "Container completed successfully"
    )
    print(f"\n容器完成: {'成功' if success else '失败'}")
    
    # 获取节点资源使用情况
    node_usage = container_manager.get_node_resource_usage()
    print(f"\n=== 节点资源使用 ===")
    print(f"节点ID: {node_usage['node_id']}")
    print(f"总容器数: {node_usage['total_containers']}")
    print(f"运行容器数: {node_usage['running_containers']}")
    print(f"分配内存: {node_usage['allocated_memory_mb']}MB")
    print(f"分配CPU: {node_usage['allocated_vcores']}核")
    print(f"容器状态分布: {node_usage['containers_by_state']}")

5. YARN资源监控与性能优化

5.1 资源监控系统

from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import time
import threading
import json
from collections import defaultdict, deque

@dataclass
class ClusterMetrics:
    """集群指标"""
    timestamp: int
    total_memory_mb: int
    allocated_memory_mb: int
    available_memory_mb: int
    total_vcores: int
    allocated_vcores: int
    available_vcores: int
    total_nodes: int
    active_nodes: int
    decommissioned_nodes: int
    lost_nodes: int
    unhealthy_nodes: int
    total_applications: int
    running_applications: int
    pending_applications: int
    completed_applications: int
    failed_applications: int
    killed_applications: int
    containers_allocated: int
    containers_pending: int
    containers_reserved: int
    
@dataclass
class NodeMetrics:
    """节点指标"""
    node_id: str
    timestamp: int
    memory_mb: int
    allocated_memory_mb: int
    vcores: int
    allocated_vcores: int
    num_containers: int
    cpu_usage_percent: float
    memory_usage_percent: float
    disk_usage_percent: float
    network_io_mb: float
    disk_io_mb: float
    load_average: float
    health_status: str
    
@dataclass
class QueueMetrics:
    """队列指标"""
    queue_name: str
    timestamp: int
    capacity_percent: float
    max_capacity_percent: float
    used_capacity_percent: float
    absolute_capacity_percent: float
    absolute_max_capacity_percent: float
    absolute_used_capacity_percent: float
    num_applications: int
    num_pending_applications: int
    num_active_applications: int
    allocated_memory_mb: int
    allocated_vcores: int
    pending_memory_mb: int
    pending_vcores: int
    
class YARNResourceMonitor:
    """
    YARN资源监控系统
    """
    
    def __init__(self, history_size: int = 1000):
        self.history_size = history_size
        self.cluster_metrics_history: deque = deque(maxlen=history_size)
        self.node_metrics_history: Dict[str, deque] = defaultdict(lambda: deque(maxlen=history_size))
        self.queue_metrics_history: Dict[str, deque] = defaultdict(lambda: deque(maxlen=history_size))
        
        # 告警配置
        self.alert_thresholds = {
            'memory_usage': 85.0,  # 内存使用率阈值
            'cpu_usage': 80.0,     # CPU使用率阈值
            'disk_usage': 90.0,    # 磁盘使用率阈值
            'queue_capacity': 95.0, # 队列容量阈值
            'pending_applications': 50,  # 待处理应用数阈值
            'unhealthy_nodes': 2,   # 不健康节点数阈值
        }
        
        self.alerts: List[dict] = []
        self.monitoring_enabled = False
        self.monitor_thread = None
        self.lock = threading.Lock()
    
    def start_monitoring(self, interval_seconds: int = 30):
        """
        启动监控
        
        Args:
            interval_seconds: 监控间隔(秒)
        """
        if self.monitoring_enabled:
            return
        
        self.monitoring_enabled = True
        self.monitor_thread = threading.Thread(
            target=self._monitoring_loop,
            args=(interval_seconds,),
            daemon=True
        )
        self.monitor_thread.start()
        print(f"YARN资源监控已启动,监控间隔: {interval_seconds}秒")
    
    def stop_monitoring(self):
        """
        停止监控
        """
        self.monitoring_enabled = False
        if self.monitor_thread:
            self.monitor_thread.join()
        print("YARN资源监控已停止")
    
    def collect_cluster_metrics(self) -> ClusterMetrics:
        """
        收集集群指标
        
        Returns:
            ClusterMetrics: 集群指标
        """
        # 模拟收集集群指标
        import random
        
        total_memory = 102400  # 100GB
        total_vcores = 64
        total_nodes = 10
        
        allocated_memory = random.randint(int(total_memory * 0.3), int(total_memory * 0.8))
        allocated_vcores = random.randint(int(total_vcores * 0.3), int(total_vcores * 0.8))
        active_nodes = random.randint(8, total_nodes)
        
        metrics = ClusterMetrics(
            timestamp=int(time.time() * 1000),
            total_memory_mb=total_memory,
            allocated_memory_mb=allocated_memory,
            available_memory_mb=total_memory - allocated_memory,
            total_vcores=total_vcores,
            allocated_vcores=allocated_vcores,
            available_vcores=total_vcores - allocated_vcores,
            total_nodes=total_nodes,
            active_nodes=active_nodes,
            decommissioned_nodes=0,
            lost_nodes=total_nodes - active_nodes,
            unhealthy_nodes=random.randint(0, 2),
            total_applications=random.randint(50, 200),
            running_applications=random.randint(20, 80),
            pending_applications=random.randint(0, 30),
            completed_applications=random.randint(100, 500),
            failed_applications=random.randint(5, 20),
            killed_applications=random.randint(2, 10),
            containers_allocated=random.randint(100, 400),
            containers_pending=random.randint(0, 50),
            containers_reserved=random.randint(0, 20)
        )
        
        with self.lock:
            self.cluster_metrics_history.append(metrics)
        
        return metrics
    
    def collect_node_metrics(self, node_id: str) -> NodeMetrics:
        """
        收集节点指标
        
        Args:
            node_id: 节点ID
            
        Returns:
            NodeMetrics: 节点指标
        """
        # 模拟收集节点指标
        import random
        
        total_memory = 10240  # 10GB
        total_vcores = 8
        
        allocated_memory = random.randint(int(total_memory * 0.2), int(total_memory * 0.9))
        allocated_vcores = random.randint(1, total_vcores)
        
        metrics = NodeMetrics(
            node_id=node_id,
            timestamp=int(time.time() * 1000),
            memory_mb=total_memory,
            allocated_memory_mb=allocated_memory,
            vcores=total_vcores,
            allocated_vcores=allocated_vcores,
            num_containers=random.randint(5, 20),
            cpu_usage_percent=random.uniform(20.0, 85.0),
            memory_usage_percent=(allocated_memory / total_memory) * 100,
            disk_usage_percent=random.uniform(30.0, 80.0),
            network_io_mb=random.uniform(10.0, 100.0),
            disk_io_mb=random.uniform(50.0, 200.0),
            load_average=random.uniform(1.0, 4.0),
            health_status="HEALTHY" if random.random() > 0.1 else "UNHEALTHY"
        )
        
        with self.lock:
            self.node_metrics_history[node_id].append(metrics)
        
        return metrics
    
    def collect_queue_metrics(self, queue_name: str) -> QueueMetrics:
        """
        收集队列指标
        
        Args:
            queue_name: 队列名称
            
        Returns:
            QueueMetrics: 队列指标
        """
        # 模拟收集队列指标
        import random
        
        capacity = 50.0 if queue_name == "default" else 25.0
        max_capacity = 100.0
        used_capacity = random.uniform(10.0, capacity * 0.9)
        
        metrics = QueueMetrics(
            queue_name=queue_name,
            timestamp=int(time.time() * 1000),
            capacity_percent=capacity,
            max_capacity_percent=max_capacity,
            used_capacity_percent=used_capacity,
            absolute_capacity_percent=capacity,
            absolute_max_capacity_percent=max_capacity,
            absolute_used_capacity_percent=used_capacity,
            num_applications=random.randint(5, 30),
            num_pending_applications=random.randint(0, 10),
            num_active_applications=random.randint(2, 15),
            allocated_memory_mb=random.randint(5000, 20000),
            allocated_vcores=random.randint(10, 40),
            pending_memory_mb=random.randint(0, 5000),
            pending_vcores=random.randint(0, 10)
        )
        
        with self.lock:
            self.queue_metrics_history[queue_name].append(metrics)
        
        return metrics
    
    def check_alerts(self, cluster_metrics: ClusterMetrics, 
                    node_metrics: List[NodeMetrics], 
                    queue_metrics: List[QueueMetrics]):
        """
        检查告警
        
        Args:
            cluster_metrics: 集群指标
            node_metrics: 节点指标列表
            queue_metrics: 队列指标列表
        """
        alerts = []
        
        # 检查集群级别告警
        memory_usage = (cluster_metrics.allocated_memory_mb / cluster_metrics.total_memory_mb) * 100
        if memory_usage > self.alert_thresholds['memory_usage']:
            alerts.append({
                'type': 'CLUSTER_MEMORY_HIGH',
                'severity': 'WARNING',
                'message': f'集群内存使用率过高: {memory_usage:.1f}%',
                'timestamp': cluster_metrics.timestamp,
                'details': {
                    'current_usage': memory_usage,
                    'threshold': self.alert_thresholds['memory_usage'],
                    'allocated_mb': cluster_metrics.allocated_memory_mb,
                    'total_mb': cluster_metrics.total_memory_mb
                }
            })
        
        if cluster_metrics.pending_applications > self.alert_thresholds['pending_applications']:
            alerts.append({
                'type': 'PENDING_APPLICATIONS_HIGH',
                'severity': 'WARNING',
                'message': f'待处理应用数量过多: {cluster_metrics.pending_applications}',
                'timestamp': cluster_metrics.timestamp,
                'details': {
                    'pending_count': cluster_metrics.pending_applications,
                    'threshold': self.alert_thresholds['pending_applications']
                }
            })
        
        if cluster_metrics.unhealthy_nodes > self.alert_thresholds['unhealthy_nodes']:
            alerts.append({
                'type': 'UNHEALTHY_NODES_HIGH',
                'severity': 'CRITICAL',
                'message': f'不健康节点数量过多: {cluster_metrics.unhealthy_nodes}',
                'timestamp': cluster_metrics.timestamp,
                'details': {
                    'unhealthy_count': cluster_metrics.unhealthy_nodes,
                    'threshold': self.alert_thresholds['unhealthy_nodes'],
                    'total_nodes': cluster_metrics.total_nodes
                }
            })
        
        # 检查节点级别告警
        for node_metric in node_metrics:
            if node_metric.cpu_usage_percent > self.alert_thresholds['cpu_usage']:
                alerts.append({
                    'type': 'NODE_CPU_HIGH',
                    'severity': 'WARNING',
                    'message': f'节点 {node_metric.node_id} CPU使用率过高: {node_metric.cpu_usage_percent:.1f}%',
                    'timestamp': node_metric.timestamp,
                    'details': {
                        'node_id': node_metric.node_id,
                        'cpu_usage': node_metric.cpu_usage_percent,
                        'threshold': self.alert_thresholds['cpu_usage']
                    }
                })
            
            if node_metric.memory_usage_percent > self.alert_thresholds['memory_usage']:
                alerts.append({
                    'type': 'NODE_MEMORY_HIGH',
                    'severity': 'WARNING',
                    'message': f'节点 {node_metric.node_id} 内存使用率过高: {node_metric.memory_usage_percent:.1f}%',
                    'timestamp': node_metric.timestamp,
                    'details': {
                        'node_id': node_metric.node_id,
                        'memory_usage': node_metric.memory_usage_percent,
                        'threshold': self.alert_thresholds['memory_usage']
                    }
                })
            
            if node_metric.disk_usage_percent > self.alert_thresholds['disk_usage']:
                alerts.append({
                    'type': 'NODE_DISK_HIGH',
                    'severity': 'CRITICAL',
                    'message': f'节点 {node_metric.node_id} 磁盘使用率过高: {node_metric.disk_usage_percent:.1f}%',
                    'timestamp': node_metric.timestamp,
                    'details': {
                        'node_id': node_metric.node_id,
                        'disk_usage': node_metric.disk_usage_percent,
                        'threshold': self.alert_thresholds['disk_usage']
                    }
                })
            
            if node_metric.health_status != "HEALTHY":
                alerts.append({
                    'type': 'NODE_UNHEALTHY',
                    'severity': 'CRITICAL',
                    'message': f'节点 {node_metric.node_id} 状态不健康: {node_metric.health_status}',
                    'timestamp': node_metric.timestamp,
                    'details': {
                        'node_id': node_metric.node_id,
                        'health_status': node_metric.health_status
                    }
                })
        
        # 检查队列级别告警
        for queue_metric in queue_metrics:
            if queue_metric.used_capacity_percent > self.alert_thresholds['queue_capacity']:
                alerts.append({
                    'type': 'QUEUE_CAPACITY_HIGH',
                    'severity': 'WARNING',
                    'message': f'队列 {queue_metric.queue_name} 容量使用率过高: {queue_metric.used_capacity_percent:.1f}%',
                    'timestamp': queue_metric.timestamp,
                    'details': {
                        'queue_name': queue_metric.queue_name,
                        'used_capacity': queue_metric.used_capacity_percent,
                        'threshold': self.alert_thresholds['queue_capacity']
                    }
                })
        
        # 添加新告警
        with self.lock:
            self.alerts.extend(alerts)
            # 保持告警历史在合理范围内
            if len(self.alerts) > 1000:
                self.alerts = self.alerts[-500:]
        
        # 输出告警
        for alert in alerts:
            print(f"[{alert['severity']}] {alert['message']}")
    
    def get_cluster_metrics_history(self, duration_minutes: int = 60) -> List[ClusterMetrics]:
        """
        获取集群指标历史
        
        Args:
            duration_minutes: 历史时长(分钟)
            
        Returns:
            List[ClusterMetrics]: 集群指标历史
        """
        cutoff_time = int((time.time() - duration_minutes * 60) * 1000)
        
        with self.lock:
            return [m for m in self.cluster_metrics_history if m.timestamp >= cutoff_time]
    
    def get_node_metrics_history(self, node_id: str, duration_minutes: int = 60) -> List[NodeMetrics]:
        """
        获取节点指标历史
        
        Args:
            node_id: 节点ID
            duration_minutes: 历史时长(分钟)
            
        Returns:
            List[NodeMetrics]: 节点指标历史
        """
        cutoff_time = int((time.time() - duration_minutes * 60) * 1000)
        
        with self.lock:
            return [m for m in self.node_metrics_history[node_id] if m.timestamp >= cutoff_time]
    
    def get_queue_metrics_history(self, queue_name: str, duration_minutes: int = 60) -> List[QueueMetrics]:
        """
        获取队列指标历史
        
        Args:
            queue_name: 队列名称
            duration_minutes: 历史时长(分钟)
            
        Returns:
            List[QueueMetrics]: 队列指标历史
        """
        cutoff_time = int((time.time() - duration_minutes * 60) * 1000)
        
        with self.lock:
            return [m for m in self.queue_metrics_history[queue_name] if m.timestamp >= cutoff_time]
    
    def get_recent_alerts(self, duration_minutes: int = 60) -> List[dict]:
        """
        获取最近的告警
        
        Args:
            duration_minutes: 时长(分钟)
            
        Returns:
            List[dict]: 告警列表
        """
        cutoff_time = int((time.time() - duration_minutes * 60) * 1000)
        
        with self.lock:
            return [alert for alert in self.alerts if alert['timestamp'] >= cutoff_time]
    
    def generate_monitoring_report(self, duration_minutes: int = 60) -> dict:
        """
        生成监控报告
        
        Args:
            duration_minutes: 报告时长(分钟)
            
        Returns:
            dict: 监控报告
        """
        cluster_history = self.get_cluster_metrics_history(duration_minutes)
        recent_alerts = self.get_recent_alerts(duration_minutes)
        
        if not cluster_history:
            return {'error': '没有可用的监控数据'}
        
        # 计算统计信息
        avg_memory_usage = sum(m.allocated_memory_mb / m.total_memory_mb for m in cluster_history) / len(cluster_history) * 100
        avg_cpu_usage = sum(m.allocated_vcores / m.total_vcores for m in cluster_history) / len(cluster_history) * 100
        avg_pending_apps = sum(m.pending_applications for m in cluster_history) / len(cluster_history)
        
        # 按严重程度统计告警
        alert_counts = defaultdict(int)
        for alert in recent_alerts:
            alert_counts[alert['severity']] += 1
        
        report = {
            'report_period': {
                'duration_minutes': duration_minutes,
                'start_time': datetime.fromtimestamp(cluster_history[0].timestamp / 1000).isoformat(),
                'end_time': datetime.fromtimestamp(cluster_history[-1].timestamp / 1000).isoformat()
            },
            'cluster_summary': {
                'average_memory_usage_percent': round(avg_memory_usage, 2),
                'average_cpu_usage_percent': round(avg_cpu_usage, 2),
                'average_pending_applications': round(avg_pending_apps, 2),
                'total_data_points': len(cluster_history)
            },
            'alerts_summary': {
                'total_alerts': len(recent_alerts),
                'critical_alerts': alert_counts['CRITICAL'],
                'warning_alerts': alert_counts['WARNING'],
                'info_alerts': alert_counts['INFO']
            },
            'resource_trends': self._calculate_resource_trends(cluster_history),
            'recommendations': self._generate_recommendations(cluster_history, recent_alerts)
        }
        
        return report
    
    def _monitoring_loop(self, interval_seconds: int):
        """
        监控循环
        
        Args:
            interval_seconds: 监控间隔
        """
        while self.monitoring_enabled:
            try:
                # 收集集群指标
                cluster_metrics = self.collect_cluster_metrics()
                
                # 收集节点指标
                node_metrics = []
                for i in range(cluster_metrics.active_nodes):
                    node_id = f"node{i+1}.cluster.com"
                    node_metric = self.collect_node_metrics(node_id)
                    node_metrics.append(node_metric)
                
                # 收集队列指标
                queue_metrics = []
                for queue_name in ["default", "production", "development"]:
                    queue_metric = self.collect_queue_metrics(queue_name)
                    queue_metrics.append(queue_metric)
                
                # 检查告警
                self.check_alerts(cluster_metrics, node_metrics, queue_metrics)
                
            except Exception as e:
                print(f"监控过程中发生错误: {e}")
            
            time.sleep(interval_seconds)
    
    def _calculate_resource_trends(self, cluster_history: List[ClusterMetrics]) -> dict:
        """
        计算资源趋势
        
        Args:
            cluster_history: 集群指标历史
            
        Returns:
            dict: 资源趋势
        """
        if len(cluster_history) < 2:
            return {'error': '数据点不足,无法计算趋势'}
        
        # 计算内存使用趋势
        memory_usages = [(m.allocated_memory_mb / m.total_memory_mb) * 100 for m in cluster_history]
        memory_trend = 'increasing' if memory_usages[-1] > memory_usages[0] else 'decreasing'
        
        # 计算CPU使用趋势
        cpu_usages = [(m.allocated_vcores / m.total_vcores) * 100 for m in cluster_history]
        cpu_trend = 'increasing' if cpu_usages[-1] > cpu_usages[0] else 'decreasing'
        
        # 计算应用数量趋势
        app_counts = [m.running_applications for m in cluster_history]
        app_trend = 'increasing' if app_counts[-1] > app_counts[0] else 'decreasing'
        
        return {
            'memory_usage_trend': memory_trend,
            'cpu_usage_trend': cpu_trend,
            'application_count_trend': app_trend,
            'memory_usage_change_percent': round(memory_usages[-1] - memory_usages[0], 2),
            'cpu_usage_change_percent': round(cpu_usages[-1] - cpu_usages[0], 2),
            'application_count_change': app_counts[-1] - app_counts[0]
        }
    
    def _generate_recommendations(self, cluster_history: List[ClusterMetrics], 
                                recent_alerts: List[dict]) -> List[str]:
        """
        生成优化建议
        
        Args:
            cluster_history: 集群指标历史
            recent_alerts: 最近告警
            
        Returns:
            List[str]: 优化建议
        """
        recommendations = []
        
        if not cluster_history:
            return recommendations
        
        latest_metrics = cluster_history[-1]
        
        # 内存使用率建议
        memory_usage = (latest_metrics.allocated_memory_mb / latest_metrics.total_memory_mb) * 100
        if memory_usage > 85:
            recommendations.append("集群内存使用率过高,建议增加节点或优化应用内存使用")
        elif memory_usage < 30:
            recommendations.append("集群内存使用率较低,可以考虑减少节点数量以节约成本")
        
        # CPU使用率建议
        cpu_usage = (latest_metrics.allocated_vcores / latest_metrics.total_vcores) * 100
        if cpu_usage > 80:
            recommendations.append("集群CPU使用率过高,建议增加计算节点")
        elif cpu_usage < 25:
            recommendations.append("集群CPU使用率较低,可以考虑调整节点配置")
        
        # 待处理应用建议
        if latest_metrics.pending_applications > 20:
            recommendations.append("待处理应用数量较多,建议检查队列配置和资源分配策略")
        
        # 失败应用建议
        if latest_metrics.failed_applications > 10:
            recommendations.append("失败应用数量较多,建议检查应用配置和集群健康状态")
        
        # 不健康节点建议
        if latest_metrics.unhealthy_nodes > 0:
            recommendations.append(f"发现 {latest_metrics.unhealthy_nodes} 个不健康节点,建议立即检查节点状态")
        
        # 基于告警的建议
        critical_alerts = [a for a in recent_alerts if a['severity'] == 'CRITICAL']
        if critical_alerts:
            recommendations.append(f"发现 {len(critical_alerts)} 个严重告警,需要立即处理")
        
        warning_alerts = [a for a in recent_alerts if a['severity'] == 'WARNING']
        if len(warning_alerts) > 10:
            recommendations.append("警告告警数量较多,建议检查集群配置和资源分配")
        
        return recommendations
    
    def export_metrics(self, output_file: str, duration_minutes: int = 60, format: str = 'json'):
        """
        导出监控指标
        
        Args:
            output_file: 输出文件路径
            duration_minutes: 导出时长(分钟)
            format: 导出格式(json/csv)
        """
        cluster_history = self.get_cluster_metrics_history(duration_minutes)
        
        if format.lower() == 'json':
            data = {
                'export_time': datetime.now().isoformat(),
                'duration_minutes': duration_minutes,
                'cluster_metrics': [{
                    'timestamp': m.timestamp,
                    'memory_usage_percent': (m.allocated_memory_mb / m.total_memory_mb) * 100,
                    'cpu_usage_percent': (m.allocated_vcores / m.total_vcores) * 100,
                    'running_applications': m.running_applications,
                    'pending_applications': m.pending_applications,
                    'active_nodes': m.active_nodes,
                    'unhealthy_nodes': m.unhealthy_nodes
                } for m in cluster_history]
            }
            
            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(data, f, indent=2, ensure_ascii=False)
        
        elif format.lower() == 'csv':
            import csv
            
            with open(output_file, 'w', newline='', encoding='utf-8') as f:
                writer = csv.writer(f)
                writer.writerow([
                    'timestamp', 'memory_usage_percent', 'cpu_usage_percent',
                    'running_applications', 'pending_applications', 'active_nodes', 'unhealthy_nodes'
                ])
                
                for m in cluster_history:
                    writer.writerow([
                        m.timestamp,
                        round((m.allocated_memory_mb / m.total_memory_mb) * 100, 2),
                        round((m.allocated_vcores / m.total_vcores) * 100, 2),
                        m.running_applications,
                        m.pending_applications,
                        m.active_nodes,
                        m.unhealthy_nodes
                    ])
        
        print(f"监控指标已导出到 {output_file}")

# 使用示例
if __name__ == "__main__":
    # 创建资源监控器
    monitor = YARNResourceMonitor()
    
    # 启动监控
    monitor.start_monitoring(interval_seconds=10)
    
    # 运行一段时间
    time.sleep(30)
    
    # 生成监控报告
    report = monitor.generate_monitoring_report(duration_minutes=5)
    
    print("\n=== YARN资源监控报告 ===")
    print(f"报告时间段: {report['report_period']['start_time']} - {report['report_period']['end_time']}")
    print(f"\n集群摘要:")
    print(f"  平均内存使用率: {report['cluster_summary']['average_memory_usage_percent']:.1f}%")
    print(f"  平均CPU使用率: {report['cluster_summary']['average_cpu_usage_percent']:.1f}%")
    print(f"  平均待处理应用: {report['cluster_summary']['average_pending_applications']:.1f}")
    
    print(f"\n告警摘要:")
    print(f"  总告警数: {report['alerts_summary']['total_alerts']}")
    print(f"  严重告警: {report['alerts_summary']['critical_alerts']}")
    print(f"  警告告警: {report['alerts_summary']['warning_alerts']}")
    
    print(f"\n资源趋势:")
    trends = report['resource_trends']
    if 'error' not in trends:
        print(f"  内存使用趋势: {trends['memory_usage_trend']} ({trends['memory_usage_change_percent']:+.1f}%)")
        print(f"  CPU使用趋势: {trends['cpu_usage_trend']} ({trends['cpu_usage_change_percent']:+.1f}%)")
        print(f"  应用数量趋势: {trends['application_count_trend']} ({trends['application_count_change']:+d})")
    
    print(f"\n优化建议:")
    for i, recommendation in enumerate(report['recommendations'], 1):
        print(f"  {i}. {recommendation}")
    
    # 获取最近告警
    recent_alerts = monitor.get_recent_alerts(duration_minutes=5)
    if recent_alerts:
        print(f"\n最近告警:")
        for alert in recent_alerts[-5:]:  # 显示最近5个告警
            timestamp = datetime.fromtimestamp(alert['timestamp'] / 1000).strftime('%H:%M:%S')
            print(f"  [{timestamp}] [{alert['severity']}] {alert['message']}")
    
    # 导出监控数据
    monitor.export_metrics('yarn_metrics.json', duration_minutes=5, format='json')
    monitor.export_metrics('yarn_metrics.csv', duration_minutes=5, format='csv')
    
    # 停止监控
     monitor.stop_monitoring()

5.2 性能优化策略

from typing import Dict, List, Optional, Tuple, Any
from dataclasses import dataclass, field
from datetime import datetime
import json
import math
from collections import defaultdict

@dataclass
class OptimizationRecommendation:
    """优化建议"""
    category: str  # 优化类别
    priority: str  # 优先级:HIGH, MEDIUM, LOW
    title: str     # 建议标题
    description: str  # 详细描述
    impact: str    # 预期影响
    implementation: List[str]  # 实施步骤
    config_changes: Dict[str, Any]  # 配置变更
    estimated_improvement: str  # 预估改进效果
    
class YARNPerformanceOptimizer:
    """
    YARN性能优化器
    """
    
    def __init__(self):
        # 性能基准值
        self.performance_baselines = {
            'memory_utilization_target': 75.0,  # 目标内存利用率
            'cpu_utilization_target': 70.0,     # 目标CPU利用率
            'queue_utilization_target': 80.0,   # 目标队列利用率
            'application_completion_time_target': 300,  # 目标应用完成时间(秒)
            'resource_allocation_time_target': 30,      # 目标资源分配时间(秒)
            'node_health_target': 95.0,         # 目标节点健康率
        }
        
        # 优化策略配置
        self.optimization_strategies = {
            'resource_allocation': {
                'enable_preemption': True,
                'enable_node_labels': True,
                'enable_resource_profiles': True,
                'optimize_container_sizing': True
            },
            'scheduling': {
                'enable_fair_scheduler': True,
                'optimize_queue_configuration': True,
                'enable_application_priorities': True,
                'optimize_placement_constraints': True
            },
            'memory_management': {
                'enable_memory_monitoring': True,
                'optimize_heap_sizes': True,
                'enable_off_heap_storage': True,
                'optimize_gc_settings': True
            },
            'network_optimization': {
                'enable_rack_awareness': True,
                'optimize_data_locality': True,
                'enable_compression': True,
                'optimize_shuffle_settings': True
            }
        }
    
    def analyze_cluster_performance(self, cluster_metrics: List[ClusterMetrics], 
                                  node_metrics: Dict[str, List[NodeMetrics]], 
                                  queue_metrics: Dict[str, List[QueueMetrics]]) -> Dict[str, Any]:
        """
        分析集群性能
        
        Args:
            cluster_metrics: 集群指标历史
            node_metrics: 节点指标历史
            queue_metrics: 队列指标历史
            
        Returns:
            Dict[str, Any]: 性能分析结果
        """
        if not cluster_metrics:
            return {'error': '没有可用的集群指标数据'}
        
        analysis = {
            'cluster_analysis': self._analyze_cluster_metrics(cluster_metrics),
            'node_analysis': self._analyze_node_metrics(node_metrics),
            'queue_analysis': self._analyze_queue_metrics(queue_metrics),
            'bottlenecks': self._identify_bottlenecks(cluster_metrics, node_metrics, queue_metrics),
            'performance_score': self._calculate_performance_score(cluster_metrics, node_metrics, queue_metrics)
        }
        
        return analysis
    
    def generate_optimization_recommendations(self, performance_analysis: Dict[str, Any]) -> List[OptimizationRecommendation]:
        """
        生成优化建议
        
        Args:
            performance_analysis: 性能分析结果
            
        Returns:
            List[OptimizationRecommendation]: 优化建议列表
        """
        recommendations = []
        
        if 'error' in performance_analysis:
            return recommendations
        
        cluster_analysis = performance_analysis.get('cluster_analysis', {})
        node_analysis = performance_analysis.get('node_analysis', {})
        queue_analysis = performance_analysis.get('queue_analysis', {})
        bottlenecks = performance_analysis.get('bottlenecks', [])
        
        # 基于集群分析的建议
        recommendations.extend(self._generate_cluster_recommendations(cluster_analysis))
        
        # 基于节点分析的建议
        recommendations.extend(self._generate_node_recommendations(node_analysis))
        
        # 基于队列分析的建议
        recommendations.extend(self._generate_queue_recommendations(queue_analysis))
        
        # 基于瓶颈分析的建议
        recommendations.extend(self._generate_bottleneck_recommendations(bottlenecks))
        
        # 按优先级排序
        priority_order = {'HIGH': 0, 'MEDIUM': 1, 'LOW': 2}
        recommendations.sort(key=lambda x: priority_order.get(x.priority, 3))
        
        return recommendations
    
    def generate_optimization_plan(self, recommendations: List[OptimizationRecommendation]) -> Dict[str, Any]:
        """
        生成优化计划
        
        Args:
            recommendations: 优化建议列表
            
        Returns:
            Dict[str, Any]: 优化计划
        """
        plan = {
            'plan_overview': {
                'total_recommendations': len(recommendations),
                'high_priority': len([r for r in recommendations if r.priority == 'HIGH']),
                'medium_priority': len([r for r in recommendations if r.priority == 'MEDIUM']),
                'low_priority': len([r for r in recommendations if r.priority == 'LOW']),
                'estimated_duration': self._estimate_implementation_duration(recommendations)
            },
            'implementation_phases': self._create_implementation_phases(recommendations),
            'configuration_changes': self._consolidate_config_changes(recommendations),
            'risk_assessment': self._assess_implementation_risks(recommendations),
            'success_metrics': self._define_success_metrics(recommendations)
        }
        
        return plan
    
    def _analyze_cluster_metrics(self, cluster_metrics: List[ClusterMetrics]) -> Dict[str, Any]:
        """
        分析集群指标
        
        Args:
            cluster_metrics: 集群指标列表
            
        Returns:
            Dict[str, Any]: 集群分析结果
        """
        if not cluster_metrics:
            return {}
        
        # 计算平均值
        avg_memory_usage = sum(m.allocated_memory_mb / m.total_memory_mb for m in cluster_metrics) / len(cluster_metrics) * 100
        avg_cpu_usage = sum(m.allocated_vcores / m.total_vcores for m in cluster_metrics) / len(cluster_metrics) * 100
        avg_pending_apps = sum(m.pending_applications for m in cluster_metrics) / len(cluster_metrics)
        avg_failed_apps = sum(m.failed_applications for m in cluster_metrics) / len(cluster_metrics)
        
        # 计算峰值
        peak_memory_usage = max(m.allocated_memory_mb / m.total_memory_mb for m in cluster_metrics) * 100
        peak_cpu_usage = max(m.allocated_vcores / m.total_vcores for m in cluster_metrics) * 100
        max_pending_apps = max(m.pending_applications for m in cluster_metrics)
        
        # 计算变异系数(稳定性指标)
        memory_usages = [(m.allocated_memory_mb / m.total_memory_mb) * 100 for m in cluster_metrics]
        memory_cv = (sum((x - avg_memory_usage) ** 2 for x in memory_usages) / len(memory_usages)) ** 0.5 / avg_memory_usage
        
        analysis = {
            'resource_utilization': {
                'avg_memory_usage_percent': round(avg_memory_usage, 2),
                'avg_cpu_usage_percent': round(avg_cpu_usage, 2),
                'peak_memory_usage_percent': round(peak_memory_usage, 2),
                'peak_cpu_usage_percent': round(peak_cpu_usage, 2),
                'memory_utilization_stability': round(1 / (1 + memory_cv), 2)  # 稳定性评分
            },
            'application_performance': {
                'avg_pending_applications': round(avg_pending_apps, 2),
                'max_pending_applications': max_pending_apps,
                'avg_failed_applications': round(avg_failed_apps, 2),
                'application_success_rate': round((1 - avg_failed_apps / max(1, sum(m.total_applications for m in cluster_metrics) / len(cluster_metrics))) * 100, 2)
            },
            'cluster_health': {
                'avg_active_nodes': round(sum(m.active_nodes for m in cluster_metrics) / len(cluster_metrics), 2),
                'avg_unhealthy_nodes': round(sum(m.unhealthy_nodes for m in cluster_metrics) / len(cluster_metrics), 2),
                'node_availability': round(sum(m.active_nodes / m.total_nodes for m in cluster_metrics) / len(cluster_metrics) * 100, 2)
            }
        }
        
        return analysis
    
    def _analyze_node_metrics(self, node_metrics: Dict[str, List[NodeMetrics]]) -> Dict[str, Any]:
        """
        分析节点指标
        
        Args:
            node_metrics: 节点指标字典
            
        Returns:
            Dict[str, Any]: 节点分析结果
        """
        if not node_metrics:
            return {}
        
        node_analysis = {}
        all_metrics = []
        
        for node_id, metrics in node_metrics.items():
            if not metrics:
                continue
            
            all_metrics.extend(metrics)
            
            # 分析单个节点
            avg_cpu = sum(m.cpu_usage_percent for m in metrics) / len(metrics)
            avg_memory = sum(m.memory_usage_percent for m in metrics) / len(metrics)
            avg_disk = sum(m.disk_usage_percent for m in metrics) / len(metrics)
            avg_load = sum(m.load_average for m in metrics) / len(metrics)
            
            node_analysis[node_id] = {
                'avg_cpu_usage': round(avg_cpu, 2),
                'avg_memory_usage': round(avg_memory, 2),
                'avg_disk_usage': round(avg_disk, 2),
                'avg_load_average': round(avg_load, 2),
                'health_status': metrics[-1].health_status if metrics else 'UNKNOWN'
            }
        
        # 整体节点分析
        if all_metrics:
            overall_analysis = {
                'total_nodes': len(node_metrics),
                'avg_cpu_usage_across_nodes': round(sum(m.cpu_usage_percent for m in all_metrics) / len(all_metrics), 2),
                'avg_memory_usage_across_nodes': round(sum(m.memory_usage_percent for m in all_metrics) / len(all_metrics), 2),
                'avg_disk_usage_across_nodes': round(sum(m.disk_usage_percent for m in all_metrics) / len(all_metrics), 2),
                'unhealthy_nodes': len([node for node, analysis in node_analysis.items() if analysis['health_status'] != 'HEALTHY']),
                'resource_imbalance': self._calculate_resource_imbalance(node_analysis)
            }
            
            node_analysis['overall'] = overall_analysis
        
        return node_analysis
    
    def _analyze_queue_metrics(self, queue_metrics: Dict[str, List[QueueMetrics]]) -> Dict[str, Any]:
        """
        分析队列指标
        
        Args:
            queue_metrics: 队列指标字典
            
        Returns:
            Dict[str, Any]: 队列分析结果
        """
        if not queue_metrics:
            return {}
        
        queue_analysis = {}
        
        for queue_name, metrics in queue_metrics.items():
            if not metrics:
                continue
            
            avg_used_capacity = sum(m.used_capacity_percent for m in metrics) / len(metrics)
            avg_pending_apps = sum(m.num_pending_applications for m in metrics) / len(metrics)
            avg_active_apps = sum(m.num_active_applications for m in metrics) / len(metrics)
            
            # 计算队列效率
            queue_efficiency = avg_used_capacity / metrics[0].capacity_percent if metrics[0].capacity_percent > 0 else 0
            
            queue_analysis[queue_name] = {
                'avg_used_capacity_percent': round(avg_used_capacity, 2),
                'capacity_percent': metrics[0].capacity_percent,
                'max_capacity_percent': metrics[0].max_capacity_percent,
                'avg_pending_applications': round(avg_pending_apps, 2),
                'avg_active_applications': round(avg_active_apps, 2),
                'queue_efficiency': round(queue_efficiency, 2),
                'utilization_trend': 'increasing' if metrics[-1].used_capacity_percent > metrics[0].used_capacity_percent else 'decreasing'
            }
        
        # 整体队列分析
        if queue_analysis:
            total_capacity = sum(analysis['capacity_percent'] for analysis in queue_analysis.values())
            total_used = sum(analysis['avg_used_capacity_percent'] for analysis in queue_analysis.values())
            
            overall_analysis = {
                'total_queues': len(queue_analysis),
                'overall_utilization': round(total_used / max(1, total_capacity) * 100, 2),
                'underutilized_queues': len([q for q, a in queue_analysis.items() if a['queue_efficiency'] < 0.5]),
                'overutilized_queues': len([q for q, a in queue_analysis.items() if a['avg_used_capacity_percent'] > 90])
            }
            
            queue_analysis['overall'] = overall_analysis
        
        return queue_analysis
    
    def _identify_bottlenecks(self, cluster_metrics: List[ClusterMetrics], 
                            node_metrics: Dict[str, List[NodeMetrics]], 
                            queue_metrics: Dict[str, List[QueueMetrics]]) -> List[Dict[str, Any]]:
        """
        识别性能瓶颈
        
        Args:
            cluster_metrics: 集群指标
            node_metrics: 节点指标
            queue_metrics: 队列指标
            
        Returns:
            List[Dict[str, Any]]: 瓶颈列表
        """
        bottlenecks = []
        
        if not cluster_metrics:
            return bottlenecks
        
        latest_cluster = cluster_metrics[-1]
        
        # 检查资源瓶颈
        memory_usage = (latest_cluster.allocated_memory_mb / latest_cluster.total_memory_mb) * 100
        cpu_usage = (latest_cluster.allocated_vcores / latest_cluster.total_vcores) * 100
        
        if memory_usage > 85:
            bottlenecks.append({
                'type': 'RESOURCE_BOTTLENECK',
                'category': 'MEMORY',
                'severity': 'HIGH',
                'description': f'集群内存使用率过高 ({memory_usage:.1f}%)',
                'impact': '可能导致应用排队等待资源,影响整体性能',
                'current_value': memory_usage,
                'threshold': 85.0
            })
        
        if cpu_usage > 80:
            bottlenecks.append({
                'type': 'RESOURCE_BOTTLENECK',
                'category': 'CPU',
                'severity': 'HIGH',
                'description': f'集群CPU使用率过高 ({cpu_usage:.1f}%)',
                'impact': '可能导致任务执行缓慢,影响应用完成时间',
                'current_value': cpu_usage,
                'threshold': 80.0
            })
        
        # 检查应用瓶颈
        if latest_cluster.pending_applications > 20:
            bottlenecks.append({
                'type': 'APPLICATION_BOTTLENECK',
                'category': 'SCHEDULING',
                'severity': 'MEDIUM',
                'description': f'待处理应用数量过多 ({latest_cluster.pending_applications})',
                'impact': '应用启动延迟,用户体验下降',
                'current_value': latest_cluster.pending_applications,
                'threshold': 20
            })
        
        # 检查节点瓶颈
        for node_id, metrics in node_metrics.items():
            if not metrics:
                continue
            
            latest_node = metrics[-1]
            
            if latest_node.cpu_usage_percent > 90:
                bottlenecks.append({
                    'type': 'NODE_BOTTLENECK',
                    'category': 'CPU',
                    'severity': 'HIGH',
                    'description': f'节点 {node_id} CPU使用率过高 ({latest_node.cpu_usage_percent:.1f}%)',
                    'impact': '节点性能下降,可能影响运行在该节点上的容器',
                    'node_id': node_id,
                    'current_value': latest_node.cpu_usage_percent,
                    'threshold': 90.0
                })
            
            if latest_node.memory_usage_percent > 90:
                bottlenecks.append({
                    'type': 'NODE_BOTTLENECK',
                    'category': 'MEMORY',
                    'severity': 'HIGH',
                    'description': f'节点 {node_id} 内存使用率过高 ({latest_node.memory_usage_percent:.1f}%)',
                    'impact': '可能导致内存不足,影响容器稳定性',
                    'node_id': node_id,
                    'current_value': latest_node.memory_usage_percent,
                    'threshold': 90.0
                })
        
        # 检查队列瓶颈
        for queue_name, metrics in queue_metrics.items():
            if not metrics:
                continue
            
            latest_queue = metrics[-1]
            
            if latest_queue.used_capacity_percent > 95:
                bottlenecks.append({
                    'type': 'QUEUE_BOTTLENECK',
                    'category': 'CAPACITY',
                    'severity': 'MEDIUM',
                    'description': f'队列 {queue_name} 容量使用率过高 ({latest_queue.used_capacity_percent:.1f}%)',
                    'impact': '队列接近满载,新应用可能被拒绝或延迟',
                    'queue_name': queue_name,
                    'current_value': latest_queue.used_capacity_percent,
                    'threshold': 95.0
                })
        
        return bottlenecks
    
    def _calculate_performance_score(self, cluster_metrics: List[ClusterMetrics], 
                                   node_metrics: Dict[str, List[NodeMetrics]], 
                                   queue_metrics: Dict[str, List[QueueMetrics]]) -> Dict[str, float]:
        """
        计算性能评分
        
        Args:
            cluster_metrics: 集群指标
            node_metrics: 节点指标
            queue_metrics: 队列指标
            
        Returns:
            Dict[str, float]: 性能评分
        """
        if not cluster_metrics:
            return {'overall_score': 0.0}
        
        scores = {}
        
        # 资源利用率评分 (0-100)
        latest_cluster = cluster_metrics[-1]
        memory_usage = (latest_cluster.allocated_memory_mb / latest_cluster.total_memory_mb) * 100
        cpu_usage = (latest_cluster.allocated_vcores / latest_cluster.total_vcores) * 100
        
        # 理想利用率为75%,偏离越大评分越低
        memory_score = max(0, 100 - abs(memory_usage - 75) * 2)
        cpu_score = max(0, 100 - abs(cpu_usage - 70) * 2)
        
        scores['resource_utilization_score'] = round((memory_score + cpu_score) / 2, 2)
        
        # 应用性能评分 (0-100)
        if latest_cluster.total_applications > 0:
            success_rate = (latest_cluster.completed_applications / latest_cluster.total_applications) * 100
            pending_penalty = min(50, latest_cluster.pending_applications * 2)  # 待处理应用的惩罚
            app_score = max(0, success_rate - pending_penalty)
        else:
            app_score = 100
        
        scores['application_performance_score'] = round(app_score, 2)
        
        # 集群健康评分 (0-100)
        if latest_cluster.total_nodes > 0:
            health_rate = (latest_cluster.active_nodes / latest_cluster.total_nodes) * 100
            unhealthy_penalty = latest_cluster.unhealthy_nodes * 10
            health_score = max(0, health_rate - unhealthy_penalty)
        else:
            health_score = 0
        
        scores['cluster_health_score'] = round(health_score, 2)
        
        # 队列效率评分 (0-100)
        if queue_metrics:
            queue_scores = []
            for queue_name, metrics in queue_metrics.items():
                if metrics:
                    latest_queue = metrics[-1]
                    if latest_queue.capacity_percent > 0:
                        efficiency = (latest_queue.used_capacity_percent / latest_queue.capacity_percent) * 100
                        # 理想效率为80%
                        queue_score = max(0, 100 - abs(efficiency - 80) * 1.5)
                        queue_scores.append(queue_score)
            
            if queue_scores:
                scores['queue_efficiency_score'] = round(sum(queue_scores) / len(queue_scores), 2)
            else:
                scores['queue_efficiency_score'] = 50.0
        else:
            scores['queue_efficiency_score'] = 50.0
        
        # 综合评分
        weights = {
            'resource_utilization_score': 0.3,
            'application_performance_score': 0.3,
            'cluster_health_score': 0.25,
            'queue_efficiency_score': 0.15
        }
        
        overall_score = sum(scores[key] * weight for key, weight in weights.items())
        scores['overall_score'] = round(overall_score, 2)
        
        return scores
    
    def _calculate_resource_imbalance(self, node_analysis: Dict[str, Dict[str, Any]]) -> float:
        """
        计算资源不平衡度
        
        Args:
            node_analysis: 节点分析结果
            
        Returns:
            float: 资源不平衡度 (0-1,越接近0越平衡)
        """
        cpu_usages = []
        memory_usages = []
        
        for node_id, analysis in node_analysis.items():
            if node_id == 'overall':
                continue
            
            cpu_usages.append(analysis.get('avg_cpu_usage', 0))
            memory_usages.append(analysis.get('avg_memory_usage', 0))
        
        if not cpu_usages or not memory_usages:
            return 0.0
        
        # 计算变异系数
        cpu_mean = sum(cpu_usages) / len(cpu_usages)
        memory_mean = sum(memory_usages) / len(memory_usages)
        
        cpu_cv = (sum((x - cpu_mean) ** 2 for x in cpu_usages) / len(cpu_usages)) ** 0.5 / max(1, cpu_mean)
        memory_cv = (sum((x - memory_mean) ** 2 for x in memory_usages) / len(memory_usages)) ** 0.5 / max(1, memory_mean)
        
        # 返回平均变异系数,限制在0-1范围内
        return min(1.0, (cpu_cv + memory_cv) / 2)
    
    def _generate_cluster_recommendations(self, cluster_analysis: Dict[str, Any]) -> List[OptimizationRecommendation]:
        """
        基于集群分析生成建议
        
        Args:
            cluster_analysis: 集群分析结果
            
        Returns:
            List[OptimizationRecommendation]: 建议列表
        """
        recommendations = []
        
        resource_util = cluster_analysis.get('resource_utilization', {})
        app_perf = cluster_analysis.get('application_performance', {})
        cluster_health = cluster_analysis.get('cluster_health', {})
        
        # 内存使用率建议
        avg_memory = resource_util.get('avg_memory_usage_percent', 0)
        if avg_memory > 85:
            recommendations.append(OptimizationRecommendation(
                category='resource_management',
                priority='HIGH',
                title='优化集群内存使用',
                description=f'集群平均内存使用率为 {avg_memory:.1f}%,超过建议阈值85%',
                impact='降低内存压力,减少应用等待时间,提高集群稳定性',
                implementation=[
                    '增加集群节点数量',
                    '优化应用内存配置',
                    '启用内存压缩',
                    '调整容器内存限制'
                ],
                config_changes={
                    'yarn.scheduler.maximum-allocation-mb': 'increase by 20%',
                    'yarn.nodemanager.resource.memory-mb': 'increase by 20%',
                    'yarn.app.mapreduce.am.resource.mb': 'optimize based on workload'
                },
                estimated_improvement='内存使用率降低10-15%,应用启动时间减少20%'
            ))
        elif avg_memory < 40:
            recommendations.append(OptimizationRecommendation(
                category='cost_optimization',
                priority='MEDIUM',
                title='优化集群资源配置',
                description=f'集群平均内存使用率为 {avg_memory:.1f}%,资源利用率较低',
                impact='提高资源利用率,降低运营成本',
                implementation=[
                    '减少集群节点数量',
                    '调整节点规格',
                    '优化资源分配策略'
                ],
                config_changes={
                    'yarn.scheduler.capacity.resource-calculator': 'org.apache.hadoop.yarn.util.resource.DominantResourceCalculator'
                },
                estimated_improvement='资源利用率提高15-20%,成本降低10-15%'
            ))
        
        # 应用性能建议
        success_rate = app_perf.get('application_success_rate', 100)
        if success_rate < 90:
            recommendations.append(OptimizationRecommendation(
                category='application_optimization',
                priority='HIGH',
                title='提高应用成功率',
                description=f'应用成功率为 {success_rate:.1f}%,低于建议阈值90%',
                impact='减少应用失败,提高用户满意度,降低重试成本',
                implementation=[
                    '分析应用失败原因',
                    '优化应用配置',
                    '增强错误处理机制',
                    '改进资源分配策略'
                ],
                config_changes={
                    'yarn.resourcemanager.am.max-attempts': '3',
                    'yarn.app.mapreduce.am.job.recovery.enable': 'true',
                    'mapreduce.map.maxattempts': '4',
                    'mapreduce.reduce.maxattempts': '4'
                },
                estimated_improvement='应用成功率提高5-10%,重试次数减少30%'
            ))
        
        # 节点健康建议
        node_availability = cluster_health.get('node_availability', 100)
        if node_availability < 95:
            recommendations.append(OptimizationRecommendation(
                category='infrastructure',
                priority='HIGH',
                title='改善集群节点健康状况',
                description=f'节点可用性为 {node_availability:.1f}%,低于建议阈值95%',
                impact='提高集群稳定性,减少任务失败,改善整体性能',
                implementation=[
                    '检查不健康节点的硬件状态',
                    '更新节点操作系统和软件',
                    '优化节点监控和告警',
                    '实施预防性维护计划'
                ],
                config_changes={
                    'yarn.nodemanager.health-checker.interval-ms': '60000',
                    'yarn.nodemanager.health-checker.script.timeout-ms': '20000'
                },
                estimated_improvement='节点可用性提高到98%以上,任务失败率降低50%'
            ))
        
        return recommendations
    
    def _generate_node_recommendations(self, node_analysis: Dict[str, Any]) -> List[OptimizationRecommendation]:
        """
        基于节点分析生成建议
        
        Args:
            node_analysis: 节点分析结果
            
        Returns:
            List[OptimizationRecommendation]: 建议列表
        """
        recommendations = []
        
        overall = node_analysis.get('overall', {})
        resource_imbalance = overall.get('resource_imbalance', 0)
        
        # 资源不平衡建议
        if resource_imbalance > 0.3:
            recommendations.append(OptimizationRecommendation(
                category='load_balancing',
                priority='MEDIUM',
                title='优化节点间负载均衡',
                description=f'节点间资源不平衡度为 {resource_imbalance:.2f},建议优化负载分布',
                impact='提高资源利用效率,避免热点节点,改善整体性能',
                implementation=[
                    '启用节点标签和约束',
                    '优化调度器配置',
                    '实施数据本地性优化',
                    '调整容器放置策略'
                ],
                config_changes={
                    'yarn.scheduler.capacity.node-locality-delay': '40',
                    'yarn.scheduler.capacity.rack-locality-additional-delay': '10',
                    'yarn.nodemanager.node-labels.provider': 'config'
                },
                estimated_improvement='资源分布均匀度提高25%,热点问题减少40%'
            ))
        
        # 检查高负载节点
        high_load_nodes = []
        for node_id, analysis in node_analysis.items():
            if node_id == 'overall':
                continue
            
            if analysis.get('avg_cpu_usage', 0) > 85 or analysis.get('avg_memory_usage', 0) > 85:
                high_load_nodes.append(node_id)
        
        if high_load_nodes:
            recommendations.append(OptimizationRecommendation(
                category='capacity_planning',
                priority='HIGH',
                title='处理高负载节点',
                description=f'发现 {len(high_load_nodes)} 个高负载节点,需要优化或扩容',
                impact='减少节点压力,提高任务执行效率,避免性能瓶颈',
                implementation=[
                    '分析高负载节点的工作负载模式',
                    '考虑增加节点硬件资源',
                    '优化任务调度策略',
                    '实施容器资源限制'
                ],
                config_changes={
                    'yarn.scheduler.capacity.maximum-am-resource-percent': '0.1',
                    'yarn.scheduler.capacity.maximum-applications': '10000'
                },
                estimated_improvement='高负载节点压力减少30%,任务执行时间缩短15%'
            ))
        
        return recommendations
    
    def _generate_queue_recommendations(self, queue_analysis: Dict[str, Any]) -> List[OptimizationRecommendation]:
        """
        基于队列分析生成建议
        
        Args:
            queue_analysis: 队列分析结果
            
        Returns:
            List[OptimizationRecommendation]: 建议列表
        """
        recommendations = []
        
        overall = queue_analysis.get('overall', {})
        underutilized_queues = overall.get('underutilized_queues', 0)
        overutilized_queues = overall.get('overutilized_queues', 0)
        
        # 队列利用率不均建议
        if underutilized_queues > 0 or overutilized_queues > 0:
            recommendations.append(OptimizationRecommendation(
                category='queue_management',
                priority='MEDIUM',
                title='优化队列容量配置',
                description=f'发现 {underutilized_queues} 个低利用率队列和 {overutilized_queues} 个过载队列',
                impact='提高队列资源利用率,减少资源浪费,改善应用响应时间',
                implementation=[
                    '重新评估队列容量分配',
                    '启用队列间资源共享',
                    '调整队列优先级',
                    '实施动态队列管理'
                ],
                config_changes={
                    'yarn.scheduler.capacity.maximum-capacity': 'adjust per queue',
                    'yarn.scheduler.capacity.user-limit-factor': '1',
                    'yarn.scheduler.capacity.maximum-am-resource-percent': '0.1'
                },
                estimated_improvement='队列利用率提高20%,资源浪费减少15%'
            ))
        
        # 检查具体队列问题
        for queue_name, analysis in queue_analysis.items():
            if queue_name == 'overall':
                continue
            
            efficiency = analysis.get('queue_efficiency', 0)
            if efficiency < 0.3:
                recommendations.append(OptimizationRecommendation(
                    category='queue_optimization',
                    priority='LOW',
                    title=f'优化队列 {queue_name} 的使用效率',
                    description=f'队列 {queue_name} 效率为 {efficiency:.2f},利用率较低',
                    impact='提高特定队列的资源利用率',
                    implementation=[
                        f'分析队列 {queue_name} 的工作负载模式',
                        '考虑减少队列容量或合并队列',
                        '优化应用提交策略'
                    ],
                    config_changes={
                        f'yarn.scheduler.capacity.root.{queue_name}.capacity': 'reduce by 20%'
                    },
                    estimated_improvement=f'队列 {queue_name} 效率提高30%'
                ))
        
        return recommendations
    
    def _generate_bottleneck_recommendations(self, bottlenecks: List[Dict[str, Any]]) -> List[OptimizationRecommendation]:
        """
        基于瓶颈分析生成建议
        
        Args:
            bottlenecks: 瓶颈列表
            
        Returns:
            List[OptimizationRecommendation]: 建议列表
        """
        recommendations = []
        
        # 按瓶颈类型分组
        bottleneck_groups = defaultdict(list)
        for bottleneck in bottlenecks:
            key = f"{bottleneck['type']}_{bottleneck['category']}"
            bottleneck_groups[key].append(bottleneck)
        
        for group_key, group_bottlenecks in bottleneck_groups.items():
            if not group_bottlenecks:
                continue
            
            bottleneck_type = group_bottlenecks[0]['type']
            category = group_bottlenecks[0]['category']
            severity = max(b['severity'] for b in group_bottlenecks)
            
            if bottleneck_type == 'RESOURCE_BOTTLENECK':
                if category == 'MEMORY':
                    recommendations.append(OptimizationRecommendation(
                        category='resource_scaling',
                        priority=severity,
                        title='解决内存资源瓶颈',
                        description=f'检测到 {len(group_bottlenecks)} 个内存瓶颈',
                        impact='消除内存瓶颈,提高应用执行效率',
                        implementation=[
                            '增加集群内存容量',
                            '优化应用内存使用',
                            '启用内存压缩和回收',
                            '调整内存分配策略'
                        ],
                        config_changes={
                            'yarn.nodemanager.resource.memory-mb': 'increase',
                            'yarn.scheduler.maximum-allocation-mb': 'increase',
                            'yarn.nodemanager.vmem-pmem-ratio': '2.1'
                        },
                        estimated_improvement='内存瓶颈消除,应用等待时间减少50%'
                    ))
                elif category == 'CPU':
                    recommendations.append(OptimizationRecommendation(
                        category='resource_scaling',
                        priority=severity,
                        title='解决CPU资源瓶颈',
                        description=f'检测到 {len(group_bottlenecks)} 个CPU瓶颈',
                        impact='消除CPU瓶颈,提高任务执行速度',
                        implementation=[
                            '增加集群CPU核心数',
                            '优化任务并行度',
                            '调整CPU分配策略',
                            '启用CPU隔离'
                        ],
                        config_changes={
                            'yarn.nodemanager.resource.cpu-vcores': 'increase',
                            'yarn.scheduler.maximum-allocation-vcores': 'increase'
                        },
                        estimated_improvement='CPU瓶颈消除,任务执行时间减少30%'
                    ))
            
            elif bottleneck_type == 'APPLICATION_BOTTLENECK':
                recommendations.append(OptimizationRecommendation(
                    category='scheduling_optimization',
                    priority=severity,
                    title='优化应用调度性能',
                    description=f'检测到应用调度瓶颈,{len(group_bottlenecks)} 个相关问题',
                    impact='减少应用等待时间,提高调度效率',
                    implementation=[
                        '优化调度器配置',
                        '调整应用优先级',
                        '启用抢占机制',
                        '优化资源分配算法'
                    ],
                    config_changes={
                        'yarn.scheduler.capacity.resource-calculator': 'DominantResourceCalculator',
                        'yarn.scheduler.capacity.preemption.enable': 'true',
                        'yarn.scheduler.capacity.preemption.monitoring_interval': '3000'
                    },
                    estimated_improvement='应用调度延迟减少40%,吞吐量提高25%'
                ))
        
        return recommendations
    
    def _estimate_implementation_duration(self, recommendations: List[OptimizationRecommendation]) -> str:
        """
        估算实施时长
        
        Args:
            recommendations: 建议列表
            
        Returns:
            str: 估算时长
        """
        total_days = 0
        
        for rec in recommendations:
            if rec.priority == 'HIGH':
                total_days += 3
            elif rec.priority == 'MEDIUM':
                total_days += 2
            else:
                total_days += 1
        
        if total_days <= 7:
            return f"{total_days} 天"
        elif total_days <= 30:
            return f"{total_days // 7} 周"
        else:
            return f"{total_days // 30} 个月"
    
    def _create_implementation_phases(self, recommendations: List[OptimizationRecommendation]) -> List[Dict[str, Any]]:
        """
        创建实施阶段
        
        Args:
            recommendations: 建议列表
            
        Returns:
            List[Dict[str, Any]]: 实施阶段
        """
        phases = [
            {
                'phase': 1,
                'name': '紧急优化',
                'description': '处理高优先级问题,解决关键瓶颈',
                'recommendations': [r for r in recommendations if r.priority == 'HIGH'],
                'estimated_duration': '1-2 周'
            },
            {
                'phase': 2,
                'name': '性能提升',
                'description': '实施中等优先级优化,提升整体性能',
                'recommendations': [r for r in recommendations if r.priority == 'MEDIUM'],
                'estimated_duration': '2-3 周'
            },
            {
                'phase': 3,
                'name': '精细调优',
                'description': '实施低优先级优化,进行精细调优',
                'recommendations': [r for r in recommendations if r.priority == 'LOW'],
                'estimated_duration': '1-2 周'
            }
        ]
        
        return [phase for phase in phases if phase['recommendations']]
    
    def _consolidate_config_changes(self, recommendations: List[OptimizationRecommendation]) -> Dict[str, Any]:
        """
        合并配置变更
        
        Args:
            recommendations: 建议列表
            
        Returns:
            Dict[str, Any]: 合并后的配置变更
        """
        consolidated = {}
        
        for rec in recommendations:
            for key, value in rec.config_changes.items():
                if key not in consolidated:
                    consolidated[key] = []
                consolidated[key].append({
                    'recommendation': rec.title,
                    'change': value,
                    'priority': rec.priority
                })
        
        return consolidated
    
    def _assess_implementation_risks(self, recommendations: List[OptimizationRecommendation]) -> List[Dict[str, str]]:
        """
        评估实施风险
        
        Args:
            recommendations: 建议列表
            
        Returns:
            List[Dict[str, str]]: 风险评估
        """
        risks = [
            {
                'risk': '配置变更风险',
                'level': 'MEDIUM',
                'description': '配置变更可能导致服务中断或性能下降',
                'mitigation': '在测试环境验证配置,分阶段部署,准备回滚方案'
            },
            {
                'risk': '资源扩容风险',
                'level': 'LOW',
                'description': '硬件资源扩容可能涉及成本增加',
                'mitigation': '进行成本效益分析,考虑云资源弹性扩容'
            },
            {
                'risk': '应用兼容性风险',
                'level': 'MEDIUM',
                'description': '优化可能影响现有应用的兼容性',
                'mitigation': '充分测试现有应用,提供兼容性指南'
            }
        ]
        
        return risks
    
    def _define_success_metrics(self, recommendations: List[OptimizationRecommendation]) -> List[Dict[str, str]]:
        """
        定义成功指标
        
        Args:
            recommendations: 建议列表
            
        Returns:
            List[Dict[str, str]]: 成功指标
        """
        metrics = [
            {
                'metric': '资源利用率',
                'target': '内存利用率保持在70-80%,CPU利用率保持在60-75%',
                'measurement': '通过YARN监控系统持续监测'
            },
            {
                'metric': '应用性能',
                'target': '应用成功率提高到95%以上,平均完成时间减少20%',
                'measurement': '分析应用历史记录和性能日志'
            },
            {
                'metric': '集群稳定性',
                'target': '节点可用性保持在98%以上,故障恢复时间减少50%',
                'measurement': '监控节点健康状态和故障统计'
            },
            {
                'metric': '用户满意度',
                'target': '作业提交到完成的端到端时间减少30%',
                'measurement': '用户反馈和作业执行时间统计'
            }
        ]
        
        return metrics

# 使用示例
if __name__ == "__main__":
    # 创建性能优化器
    optimizer = YARNPerformanceOptimizer()
    
    # 模拟性能数据
    from datetime import datetime, timedelta
    import random
    
    # 生成模拟的集群指标
    cluster_metrics = []
    for i in range(24):  # 24小时的数据
        timestamp = int((datetime.now() - timedelta(hours=23-i)).timestamp() * 1000)
        metrics = ClusterMetrics(
            timestamp=timestamp,
            total_memory_mb=102400,
            allocated_memory_mb=random.randint(70000, 95000),
            available_memory_mb=random.randint(7000, 32000),
            total_vcores=64,
            allocated_vcores=random.randint(45, 60),
            available_vcores=random.randint(4, 19),
            total_nodes=10,
            active_nodes=random.randint(9, 10),
            decommissioned_nodes=0,
            lost_nodes=random.randint(0, 1),
            unhealthy_nodes=random.randint(0, 1),
            total_applications=random.randint(100, 200),
            running_applications=random.randint(30, 60),
            pending_applications=random.randint(5, 25),
            completed_applications=random.randint(80, 150),
            failed_applications=random.randint(2, 15),
            killed_applications=random.randint(1, 5),
            containers_allocated=random.randint(200, 400),
            containers_pending=random.randint(10, 50),
            containers_reserved=random.randint(5, 20)
        )
        cluster_metrics.append(metrics)
    
    # 生成模拟的节点指标
    node_metrics = {}
    for node_id in [f"node{i}.cluster.com" for i in range(1, 11)]:
        node_metrics[node_id] = []
        for i in range(24):
            timestamp = int((datetime.now() - timedelta(hours=23-i)).timestamp() * 1000)
            metrics = NodeMetrics(
                node_id=node_id,
                timestamp=timestamp,
                memory_mb=10240,
                allocated_memory_mb=random.randint(6000, 9500),
                vcores=8,
                allocated_vcores=random.randint(5, 8),
                num_containers=random.randint(10, 25),
                cpu_usage_percent=random.uniform(40.0, 90.0),
                memory_usage_percent=random.uniform(60.0, 95.0),
                disk_usage_percent=random.uniform(30.0, 80.0),
                network_io_mb=random.uniform(50.0, 200.0),
                disk_io_mb=random.uniform(100.0, 500.0),
                load_average=random.uniform(2.0, 6.0),
                health_status="HEALTHY" if random.random() > 0.05 else "UNHEALTHY"
            )
            node_metrics[node_id].append(metrics)
    
    # 生成模拟的队列指标
    queue_metrics = {}
    for queue_name in ["default", "production", "development"]:
        queue_metrics[queue_name] = []
        capacity = 50.0 if queue_name == "default" else 25.0
        for i in range(24):
            timestamp = int((datetime.now() - timedelta(hours=23-i)).timestamp() * 1000)
            metrics = QueueMetrics(
                queue_name=queue_name,
                timestamp=timestamp,
                capacity_percent=capacity,
                max_capacity_percent=100.0,
                used_capacity_percent=random.uniform(capacity * 0.3, capacity * 0.95),
                absolute_capacity_percent=capacity,
                absolute_max_capacity_percent=100.0,
                absolute_used_capacity_percent=random.uniform(capacity * 0.3, capacity * 0.95),
                num_applications=random.randint(10, 40),
                num_pending_applications=random.randint(2, 15),
                num_active_applications=random.randint(5, 25),
                allocated_memory_mb=random.randint(10000, 30000),
                allocated_vcores=random.randint(15, 35),
                pending_memory_mb=random.randint(1000, 8000),
                pending_vcores=random.randint(2, 12)
            )
            queue_metrics[queue_name].append(metrics)
    
    # 分析集群性能
    print("=== YARN性能分析 ===")
    performance_analysis = optimizer.analyze_cluster_performance(cluster_metrics, node_metrics, queue_metrics)
    
    # 显示性能评分
    performance_score = performance_analysis.get('performance_score', {})
    print(f"\n性能评分:")
    print(f"  综合评分: {performance_score.get('overall_score', 0):.1f}/100")
    print(f"  资源利用率评分: {performance_score.get('resource_utilization_score', 0):.1f}/100")
    print(f"  应用性能评分: {performance_score.get('application_performance_score', 0):.1f}/100")
    print(f"  集群健康评分: {performance_score.get('cluster_health_score', 0):.1f}/100")
    print(f"  队列效率评分: {performance_score.get('queue_efficiency_score', 0):.1f}/100")
    
    # 显示瓶颈分析
    bottlenecks = performance_analysis.get('bottlenecks', [])
    if bottlenecks:
        print(f"\n发现的性能瓶颈:")
        for i, bottleneck in enumerate(bottlenecks[:5], 1):  # 显示前5个瓶颈
            print(f"  {i}. [{bottleneck['severity']}] {bottleneck['description']}")
            print(f"     影响: {bottleneck['impact']}")
    
    # 生成优化建议
    print(f"\n=== 优化建议生成 ===")
    recommendations = optimizer.generate_optimization_recommendations(performance_analysis)
    
    print(f"生成了 {len(recommendations)} 条优化建议:")
    for i, rec in enumerate(recommendations[:5], 1):  # 显示前5条建议
        print(f"\n{i}. [{rec.priority}] {rec.title}")
        print(f"   类别: {rec.category}")
        print(f"   描述: {rec.description}")
        print(f"   预期影响: {rec.impact}")
        print(f"   预估改进: {rec.estimated_improvement}")
    
    # 生成优化计划
    print(f"\n=== 优化实施计划 ===")
    optimization_plan = optimizer.generate_optimization_plan(recommendations)
    
    plan_overview = optimization_plan.get('plan_overview', {})
    print(f"计划概览:")
    print(f"  总建议数: {plan_overview.get('total_recommendations', 0)}")
    print(f"  高优先级: {plan_overview.get('high_priority', 0)}")
    print(f"  中优先级: {plan_overview.get('medium_priority', 0)}")
    print(f"  低优先级: {plan_overview.get('low_priority', 0)}")
    print(f"  预估实施时长: {plan_overview.get('estimated_duration', 'N/A')}")
    
    # 显示实施阶段
    phases = optimization_plan.get('implementation_phases', [])
    if phases:
        print(f"\n实施阶段:")
        for phase in phases:
            print(f"  阶段 {phase['phase']}: {phase['name']}")
            print(f"    描述: {phase['description']}")
            print(f"    建议数: {len(phase['recommendations'])}")
            print(f"    预估时长: {phase['estimated_duration']}")
    
    # 显示成功指标
    success_metrics = optimization_plan.get('success_metrics', [])
    if success_metrics:
        print(f"\n成功指标:")
        for metric in success_metrics:
             print(f"  {metric['metric']}: {metric['target']}")

5.3 最佳实践

from typing import Dict, List, Optional, Tuple, Any, Set
from dataclasses import dataclass, field
from datetime import datetime
import json
import logging
from enum import Enum

class BestPracticeCategory(Enum):
    """最佳实践类别"""
    RESOURCE_MANAGEMENT = "resource_management"
    SECURITY = "security"
    PERFORMANCE = "performance"
    MONITORING = "monitoring"
    CONFIGURATION = "configuration"
    DEPLOYMENT = "deployment"
    MAINTENANCE = "maintenance"

@dataclass
class BestPractice:
    """最佳实践"""
    id: str
    category: BestPracticeCategory
    title: str
    description: str
    importance: str  # HIGH, MEDIUM, LOW
    implementation_steps: List[str]
    configuration_examples: Dict[str, str]
    benefits: List[str]
    risks_if_ignored: List[str]
    related_practices: List[str] = field(default_factory=list)
    
class YARNBestPracticesGuide:
    """
    YARN最佳实践指南
    """
    
    def __init__(self):
        self.practices = self._initialize_best_practices()
        self.logger = logging.getLogger(__name__)
    
    def _initialize_best_practices(self) -> List[BestPractice]:
        """
        初始化最佳实践列表
        
        Returns:
            List[BestPractice]: 最佳实践列表
        """
        practices = [
            # 资源管理最佳实践
            BestPractice(
                id="rm_001",
                category=BestPracticeCategory.RESOURCE_MANAGEMENT,
                title="合理配置内存和CPU资源",
                description="根据工作负载特性合理配置NodeManager的内存和CPU资源,避免资源浪费或不足",
                importance="HIGH",
                implementation_steps=[
                    "分析历史工作负载模式",
                    "计算合理的内存和CPU配比",
                    "配置yarn.nodemanager.resource.memory-mb",
                    "配置yarn.nodemanager.resource.cpu-vcores",
                    "设置合理的容器最大资源限制",
                    "监控资源利用率并调整"
                ],
                configuration_examples={
                    "yarn.nodemanager.resource.memory-mb": "14336",  # 14GB for 16GB node
                    "yarn.nodemanager.resource.cpu-vcores": "7",     # 7 cores for 8-core node
                    "yarn.scheduler.maximum-allocation-mb": "14336",
                    "yarn.scheduler.maximum-allocation-vcores": "7",
                    "yarn.nodemanager.vmem-pmem-ratio": "2.1"
                },
                benefits=[
                    "提高资源利用率",
                    "减少资源争用",
                    "提高应用性能",
                    "降低运营成本"
                ],
                risks_if_ignored=[
                    "资源浪费",
                    "应用性能下降",
                    "集群不稳定",
                    "成本增加"
                ],
                related_practices=["rm_002", "rm_003"]
            ),
            
            BestPractice(
                id="rm_002",
                category=BestPracticeCategory.RESOURCE_MANAGEMENT,
                title="启用资源隔离和限制",
                description="使用CGroups等技术实现容器间的资源隔离,防止资源争用",
                importance="HIGH",
                implementation_steps=[
                    "启用Linux CGroups",
                    "配置CPU隔离",
                    "配置内存隔离",
                    "设置磁盘I/O限制",
                    "配置网络带宽限制",
                    "监控隔离效果"
                ],
                configuration_examples={
                    "yarn.nodemanager.container-executor.class": "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor",
                    "yarn.nodemanager.linux-container-executor.cgroups.hierarchy": "/hadoop-yarn",
                    "yarn.nodemanager.linux-container-executor.cgroups.mount": "true",
                    "yarn.nodemanager.linux-container-executor.cgroups.mount-path": "/sys/fs/cgroup",
                    "yarn.nodemanager.resource.percentage-physical-cpu-limit": "80"
                },
                benefits=[
                    "防止资源争用",
                    "提高系统稳定性",
                    "保证服务质量",
                    "提高多租户安全性"
                ],
                risks_if_ignored=[
                    "容器间资源争用",
                    "系统不稳定",
                    "性能不可预测",
                    "安全风险"
                ],
                related_practices=["rm_001", "sec_001"]
            ),
            
            BestPractice(
                id="rm_003",
                category=BestPracticeCategory.RESOURCE_MANAGEMENT,
                title="实施智能队列管理",
                description="根据业务需求配置合理的队列结构和资源分配策略",
                importance="HIGH",
                implementation_steps=[
                    "分析业务需求和优先级",
                    "设计队列层次结构",
                    "配置队列容量和限制",
                    "设置用户和组限制",
                    "启用抢占机制",
                    "监控队列使用情况"
                ],
                configuration_examples={
                    "yarn.scheduler.capacity.root.queues": "production,development,adhoc",
                    "yarn.scheduler.capacity.root.production.capacity": "60",
                    "yarn.scheduler.capacity.root.development.capacity": "30",
                    "yarn.scheduler.capacity.root.adhoc.capacity": "10",
                    "yarn.scheduler.capacity.root.production.maximum-capacity": "80",
                    "yarn.scheduler.capacity.preemption.enable": "true"
                },
                benefits=[
                    "资源公平分配",
                    "提高资源利用率",
                    "支持多租户",
                    "保证SLA"
                ],
                risks_if_ignored=[
                    "资源分配不公",
                    "优先级混乱",
                    "SLA无法保证",
                    "用户体验差"
                ],
                related_practices=["rm_001", "rm_004"]
            ),
            
            # 安全最佳实践
            BestPractice(
                id="sec_001",
                category=BestPracticeCategory.SECURITY,
                title="启用Kerberos认证",
                description="在生产环境中启用Kerberos认证,确保集群安全",
                importance="HIGH",
                implementation_steps=[
                    "部署Kerberos KDC",
                    "创建服务主体",
                    "配置YARN服务认证",
                    "配置客户端认证",
                    "测试认证功能",
                    "监控认证状态"
                ],
                configuration_examples={
                    "hadoop.security.authentication": "kerberos",
                    "hadoop.security.authorization": "true",
                    "yarn.resourcemanager.keytab": "/etc/security/keytabs/rm.service.keytab",
                    "yarn.resourcemanager.principal": "rm/_HOST@REALM.COM",
                    "yarn.nodemanager.keytab": "/etc/security/keytabs/nm.service.keytab",
                    "yarn.nodemanager.principal": "nm/_HOST@REALM.COM"
                },
                benefits=[
                    "防止未授权访问",
                    "保护数据安全",
                    "满足合规要求",
                    "提高系统可信度"
                ],
                risks_if_ignored=[
                    "安全漏洞",
                    "数据泄露",
                    "合规风险",
                    "声誉损失"
                ],
                related_practices=["sec_002", "sec_003"]
            ),
            
            BestPractice(
                id="sec_002",
                category=BestPracticeCategory.SECURITY,
                title="配置SSL/TLS加密",
                description="启用SSL/TLS加密保护数据传输安全",
                importance="HIGH",
                implementation_steps=[
                    "生成SSL证书",
                    "配置ResourceManager SSL",
                    "配置NodeManager SSL",
                    "配置Web UI SSL",
                    "配置客户端SSL",
                    "验证加密效果"
                ],
                configuration_examples={
                    "hadoop.ssl.require.client.cert": "false",
                    "hadoop.ssl.hostname.verifier": "DEFAULT",
                    "hadoop.ssl.keystores.factory.class": "org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory",
                    "hadoop.ssl.server.conf": "ssl-server.xml",
                    "hadoop.ssl.client.conf": "ssl-client.xml",
                    "yarn.http.policy": "HTTPS_ONLY"
                },
                benefits=[
                    "保护数据传输",
                    "防止中间人攻击",
                    "满足安全标准",
                    "增强用户信任"
                ],
                risks_if_ignored=[
                    "数据泄露",
                    "网络攻击",
                    "合规问题",
                    "安全事故"
                ],
                related_practices=["sec_001", "sec_003"]
            ),
            
            # 性能最佳实践
            BestPractice(
                id="perf_001",
                category=BestPracticeCategory.PERFORMANCE,
                title="优化JVM参数",
                description="根据工作负载特性优化ResourceManager和NodeManager的JVM参数",
                importance="MEDIUM",
                implementation_steps=[
                    "分析内存使用模式",
                    "配置堆内存大小",
                    "选择合适的垃圾收集器",
                    "调整GC参数",
                    "启用JVM监控",
                    "持续优化调整"
                ],
                configuration_examples={
                    "YARN_RESOURCEMANAGER_HEAPSIZE": "4096",
                    "YARN_NODEMANAGER_HEAPSIZE": "2048",
                    "YARN_RESOURCEMANAGER_OPTS": "-XX:+UseG1GC -XX:MaxGCPauseMillis=200",
                    "YARN_NODEMANAGER_OPTS": "-XX:+UseG1GC -XX:+PrintGCDetails"
                },
                benefits=[
                    "提高响应速度",
                    "减少GC停顿",
                    "提高吞吐量",
                    "降低延迟"
                ],
                risks_if_ignored=[
                    "性能下降",
                    "频繁GC",
                    "内存溢出",
                    "系统不稳定"
                ],
                related_practices=["perf_002", "mon_001"]
            ),
            
            # 监控最佳实践
            BestPractice(
                id="mon_001",
                category=BestPracticeCategory.MONITORING,
                title="建立全面的监控体系",
                description="建立包括资源、性能、健康状态等多维度的监控体系",
                importance="HIGH",
                implementation_steps=[
                    "部署监控工具",
                    "配置关键指标收集",
                    "设置告警规则",
                    "创建监控仪表板",
                    "建立告警响应流程",
                    "定期审查监控效果"
                ],
                configuration_examples={
                    "yarn.resourcemanager.webapp.address": "0.0.0.0:8088",
                    "yarn.nodemanager.webapp.address": "0.0.0.0:8042",
                    "yarn.timeline-service.enabled": "true",
                    "yarn.timeline-service.hostname": "timeline-server",
                    "yarn.resourcemanager.system-metrics-publisher.enabled": "true"
                },
                benefits=[
                    "及时发现问题",
                    "预防故障",
                    "优化性能",
                    "提高可用性"
                ],
                risks_if_ignored=[
                    "故障发现延迟",
                    "性能问题",
                    "服务中断",
                    "数据丢失"
                ],
                related_practices=["mon_002", "maint_001"]
            )
        ]
        
        return practices
    
    def get_practices_by_category(self, category: BestPracticeCategory) -> List[BestPractice]:
        """
        根据类别获取最佳实践
        
        Args:
            category: 实践类别
            
        Returns:
            List[BestPractice]: 该类别的最佳实践列表
        """
        return [p for p in self.practices if p.category == category]
    
    def get_practices_by_importance(self, importance: str) -> List[BestPractice]:
        """
        根据重要性获取最佳实践
        
        Args:
            importance: 重要性级别 (HIGH, MEDIUM, LOW)
            
        Returns:
            List[BestPractice]: 该重要性级别的最佳实践列表
        """
        return [p for p in self.practices if p.importance == importance]
    
    def get_practice_by_id(self, practice_id: str) -> Optional[BestPractice]:
        """
        根据ID获取最佳实践
        
        Args:
            practice_id: 实践ID
            
        Returns:
            Optional[BestPractice]: 最佳实践对象,如果不存在则返回None
        """
        for practice in self.practices:
            if practice.id == practice_id:
                return practice
        return None
    
    def generate_implementation_checklist(self, practice_ids: List[str]) -> Dict[str, Any]:
        """
        生成实施检查清单
        
        Args:
            practice_ids: 要实施的实践ID列表
            
        Returns:
            Dict[str, Any]: 实施检查清单
        """
        checklist = {
            'overview': {
                'total_practices': len(practice_ids),
                'high_priority': 0,
                'medium_priority': 0,
                'low_priority': 0
            },
            'categories': {},
            'implementation_plan': [],
            'configuration_summary': {},
            'estimated_effort': self._estimate_implementation_effort(practice_ids)
        }
        
        for practice_id in practice_ids:
            practice = self.get_practice_by_id(practice_id)
            if not practice:
                continue
            
            # 统计优先级
            if practice.importance == 'HIGH':
                checklist['overview']['high_priority'] += 1
            elif practice.importance == 'MEDIUM':
                checklist['overview']['medium_priority'] += 1
            else:
                checklist['overview']['low_priority'] += 1
            
            # 按类别分组
            category_name = practice.category.value
            if category_name not in checklist['categories']:
                checklist['categories'][category_name] = []
            checklist['categories'][category_name].append({
                'id': practice.id,
                'title': practice.title,
                'importance': practice.importance,
                'steps': practice.implementation_steps
            })
            
            # 合并配置
            checklist['configuration_summary'].update(practice.configuration_examples)
        
        # 生成实施计划
        checklist['implementation_plan'] = self._create_implementation_plan(practice_ids)
        
        return checklist
    
    def validate_current_setup(self, current_config: Dict[str, str]) -> Dict[str, Any]:
        """
        验证当前配置是否符合最佳实践
        
        Args:
            current_config: 当前配置
            
        Returns:
            Dict[str, Any]: 验证结果
        """
        validation_result = {
            'overall_score': 0,
            'category_scores': {},
            'compliant_practices': [],
            'non_compliant_practices': [],
            'missing_configurations': [],
            'recommendations': []
        }
        
        total_practices = len(self.practices)
        compliant_count = 0
        
        for practice in self.practices:
            is_compliant = self._check_practice_compliance(practice, current_config)
            
            if is_compliant:
                compliant_count += 1
                validation_result['compliant_practices'].append({
                    'id': practice.id,
                    'title': practice.title,
                    'category': practice.category.value
                })
            else:
                validation_result['non_compliant_practices'].append({
                    'id': practice.id,
                    'title': practice.title,
                    'category': practice.category.value,
                    'importance': practice.importance,
                    'missing_configs': self._get_missing_configs(practice, current_config)
                })
        
        # 计算总体评分
        validation_result['overall_score'] = round((compliant_count / total_practices) * 100, 2)
        
        # 按类别计算评分
        for category in BestPracticeCategory:
            category_practices = self.get_practices_by_category(category)
            category_compliant = len([p for p in category_practices 
                                    if self._check_practice_compliance(p, current_config)])
            if category_practices:
                score = round((category_compliant / len(category_practices)) * 100, 2)
                validation_result['category_scores'][category.value] = score
        
        # 生成改进建议
        validation_result['recommendations'] = self._generate_improvement_recommendations(
            validation_result['non_compliant_practices']
        )
        
        return validation_result
    
    def _check_practice_compliance(self, practice: BestPractice, current_config: Dict[str, str]) -> bool:
        """
        检查实践是否合规
        
        Args:
            practice: 最佳实践
            current_config: 当前配置
            
        Returns:
            bool: 是否合规
        """
        # 检查关键配置是否存在
        required_configs = practice.configuration_examples.keys()
        for config_key in required_configs:
            if config_key not in current_config:
                return False
        
        # 这里可以添加更复杂的合规性检查逻辑
        return True
    
    def _get_missing_configs(self, practice: BestPractice, current_config: Dict[str, str]) -> List[str]:
        """
        获取缺失的配置
        
        Args:
            practice: 最佳实践
            current_config: 当前配置
            
        Returns:
            List[str]: 缺失的配置键列表
        """
        missing = []
        for config_key in practice.configuration_examples.keys():
            if config_key not in current_config:
                missing.append(config_key)
        return missing
    
    def _estimate_implementation_effort(self, practice_ids: List[str]) -> Dict[str, Any]:
        """
        估算实施工作量
        
        Args:
            practice_ids: 实践ID列表
            
        Returns:
            Dict[str, Any]: 工作量估算
        """
        effort_mapping = {
            'HIGH': 8,    # 8小时
            'MEDIUM': 4,  # 4小时
            'LOW': 2      # 2小时
        }
        
        total_hours = 0
        effort_breakdown = {'HIGH': 0, 'MEDIUM': 0, 'LOW': 0}
        
        for practice_id in practice_ids:
            practice = self.get_practice_by_id(practice_id)
            if practice:
                hours = effort_mapping.get(practice.importance, 2)
                total_hours += hours
                effort_breakdown[practice.importance] += hours
        
        return {
            'total_hours': total_hours,
            'total_days': round(total_hours / 8, 1),
            'breakdown': effort_breakdown,
            'estimated_duration': f"{round(total_hours / 8, 1)} 工作日"
        }
    
    def _create_implementation_plan(self, practice_ids: List[str]) -> List[Dict[str, Any]]:
        """
        创建实施计划
        
        Args:
            practice_ids: 实践ID列表
            
        Returns:
            List[Dict[str, Any]]: 实施计划
        """
        # 按重要性排序
        practices = []
        for practice_id in practice_ids:
            practice = self.get_practice_by_id(practice_id)
            if practice:
                practices.append(practice)
        
        # 按重要性和依赖关系排序
        priority_order = {'HIGH': 0, 'MEDIUM': 1, 'LOW': 2}
        practices.sort(key=lambda x: priority_order.get(x.importance, 3))
        
        plan = []
        for i, practice in enumerate(practices, 1):
            plan.append({
                'phase': i,
                'practice_id': practice.id,
                'title': practice.title,
                'importance': practice.importance,
                'category': practice.category.value,
                'steps': practice.implementation_steps,
                'dependencies': practice.related_practices,
                'estimated_effort': self._estimate_implementation_effort([practice.id])
            })
        
        return plan
    
    def _generate_improvement_recommendations(self, non_compliant_practices: List[Dict[str, Any]]) -> List[Dict[str, str]]:
        """
        生成改进建议
        
        Args:
            non_compliant_practices: 不合规的实践列表
            
        Returns:
            List[Dict[str, str]]: 改进建议列表
        """
        recommendations = []
        
        # 按重要性分组
        high_priority = [p for p in non_compliant_practices if p['importance'] == 'HIGH']
        medium_priority = [p for p in non_compliant_practices if p['importance'] == 'MEDIUM']
        
        if high_priority:
            recommendations.append({
                'priority': 'HIGH',
                'title': '立即处理高优先级安全和性能问题',
                'description': f'发现 {len(high_priority)} 个高优先级问题需要立即处理',
                'action': '优先实施高重要性的最佳实践,特别是安全相关配置'
            })
        
        if medium_priority:
            recommendations.append({
                'priority': 'MEDIUM',
                'title': '计划实施性能优化措施',
                'description': f'发现 {len(medium_priority)} 个中等优先级优化机会',
                'action': '制定计划逐步实施性能和监控相关的最佳实践'
            })
        
        # 按类别分析
        category_counts = {}
        for practice in non_compliant_practices:
            category = practice['category']
            category_counts[category] = category_counts.get(category, 0) + 1
        
        if category_counts:
            most_problematic = max(category_counts.items(), key=lambda x: x[1])
            recommendations.append({
                'priority': 'MEDIUM',
                'title': f'重点关注{most_problematic[0]}领域',
                'description': f'{most_problematic[0]}领域有{most_problematic[1]}个待改进项',
                'action': f'集中精力改进{most_problematic[0]}相关的配置和流程'
            })
        
        return recommendations

# 使用示例
if __name__ == "__main__":
    # 创建最佳实践指南
    guide = YARNBestPracticesGuide()
    
    print("=== YARN最佳实践指南 ===")
    
    # 显示所有类别的实践数量
    print("\n各类别最佳实践数量:")
    for category in BestPracticeCategory:
        practices = guide.get_practices_by_category(category)
        print(f"  {category.value}: {len(practices)} 项")
    
    # 显示高优先级实践
    high_priority_practices = guide.get_practices_by_importance('HIGH')
    print(f"\n高优先级最佳实践 ({len(high_priority_practices)} 项):")
    for practice in high_priority_practices:
        print(f"  - [{practice.id}] {practice.title}")
        print(f"    类别: {practice.category.value}")
        print(f"    描述: {practice.description}")
        print(f"    主要收益: {', '.join(practice.benefits[:2])}")
        print()
    
    # 生成实施检查清单
    practice_ids = [p.id for p in high_priority_practices]
    checklist = guide.generate_implementation_checklist(practice_ids)
    
    print("=== 实施检查清单 ===")
    print(f"总实践数: {checklist['overview']['total_practices']}")
    print(f"高优先级: {checklist['overview']['high_priority']}")
    print(f"预估工作量: {checklist['estimated_effort']['estimated_duration']}")
    
    print("\n按类别分组:")
    for category, practices in checklist['categories'].items():
        print(f"  {category}:")
        for practice in practices:
            print(f"    - {practice['title']} ({practice['importance']})")
    
    # 模拟当前配置验证
    current_config = {
        "yarn.nodemanager.resource.memory-mb": "8192",
        "yarn.nodemanager.resource.cpu-vcores": "4",
        "hadoop.security.authentication": "simple",  # 不安全的配置
        "yarn.resourcemanager.webapp.address": "0.0.0.0:8088"
    }
    
    print("\n=== 配置验证结果 ===")
    validation_result = guide.validate_current_setup(current_config)
    
    print(f"总体评分: {validation_result['overall_score']:.1f}/100")
    print(f"合规实践: {len(validation_result['compliant_practices'])} 项")
    print(f"不合规实践: {len(validation_result['non_compliant_practices'])} 项")
    
    print("\n类别评分:")
    for category, score in validation_result['category_scores'].items():
        print(f"  {category}: {score:.1f}/100")
    
    print("\n改进建议:")
    for rec in validation_result['recommendations']:
        print(f"  [{rec['priority']}] {rec['title']}")
        print(f"    {rec['description']}")
        print(f"    行动: {rec['action']}")
         print()

5.4 故障排除

from typing import Dict, List, Optional, Tuple, Any, Set
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import json
import logging
import re
from enum import Enum

class FaultType(Enum):
    """故障类型"""
    RESOURCE_SHORTAGE = "resource_shortage"
    APPLICATION_FAILURE = "application_failure"
    NODE_FAILURE = "node_failure"
    NETWORK_ISSUE = "network_issue"
    CONFIGURATION_ERROR = "configuration_error"
    PERFORMANCE_DEGRADATION = "performance_degradation"
    SECURITY_ISSUE = "security_issue"

class SeverityLevel(Enum):
    """严重程度"""
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"

@dataclass
class Symptom:
    """故障症状"""
    id: str
    description: str
    indicators: List[str]
    log_patterns: List[str]
    metrics_thresholds: Dict[str, float]

@dataclass
class Solution:
    """解决方案"""
    id: str
    title: str
    description: str
    steps: List[str]
    configuration_changes: Dict[str, str]
    verification_steps: List[str]
    prevention_measures: List[str]

@dataclass
class TroubleshootingCase:
    """故障排除案例"""
    id: str
    fault_type: FaultType
    severity: SeverityLevel
    title: str
    description: str
    symptoms: List[Symptom]
    root_causes: List[str]
    solutions: List[Solution]
    related_cases: List[str] = field(default_factory=list)

class YARNTroubleshootingGuide:
    """
    YARN故障排除指南
    """
    
    def __init__(self):
        self.cases = self._initialize_troubleshooting_cases()
        self.logger = logging.getLogger(__name__)
    
    def _initialize_troubleshooting_cases(self) -> List[TroubleshootingCase]:
        """
        初始化故障排除案例
        
        Returns:
            List[TroubleshootingCase]: 故障排除案例列表
        """
        cases = [
            # 资源不足问题
            TroubleshootingCase(
                id="case_001",
                fault_type=FaultType.RESOURCE_SHORTAGE,
                severity=SeverityLevel.HIGH,
                title="集群资源不足导致应用排队",
                description="应用程序长时间处于ACCEPTED状态,无法获得足够资源启动",
                symptoms=[
                    Symptom(
                        id="sym_001",
                        description="应用长时间处于ACCEPTED状态",
                        indicators=[
                            "应用状态显示ACCEPTED超过预期时间",
                            "队列中有大量等待的应用",
                            "集群资源利用率接近100%"
                        ],
                        log_patterns=[
                            ".*Application .* is added to the scheduler.*",
                            ".*Cannot allocate container.*insufficient resources.*"
                        ],
                        metrics_thresholds={
                            "cluster_memory_utilization": 0.95,
                            "cluster_cpu_utilization": 0.95,
                            "pending_applications": 10
                        }
                    )
                ],
                root_causes=[
                    "集群总资源不足",
                    "单个应用请求资源过大",
                    "资源碎片化严重",
                    "队列配置不合理",
                    "节点故障导致可用资源减少"
                ],
                solutions=[
                    Solution(
                        id="sol_001",
                        title="增加集群资源",
                        description="通过添加节点或增加现有节点资源来解决资源不足问题",
                        steps=[
                            "评估当前资源使用情况",
                            "计算所需额外资源",
                            "添加新的NodeManager节点",
                            "或增加现有节点的内存/CPU配置",
                            "验证新资源已被ResourceManager识别",
                            "监控资源分配情况"
                        ],
                        configuration_changes={
                            "yarn.nodemanager.resource.memory-mb": "增加内存配置",
                            "yarn.nodemanager.resource.cpu-vcores": "增加CPU核心数"
                        },
                        verification_steps=[
                            "检查ResourceManager Web UI中的集群资源",
                            "验证应用能够正常启动",
                            "监控资源利用率"
                        ],
                        prevention_measures=[
                            "建立资源监控告警",
                            "制定容量规划策略",
                            "定期评估资源需求"
                        ]
                    ),
                    Solution(
                        id="sol_002",
                        title="优化队列配置",
                        description="调整队列容量和限制,提高资源利用效率",
                        steps=[
                            "分析各队列的资源使用模式",
                            "调整队列容量分配",
                            "启用队列抢占机制",
                            "设置合理的用户限制",
                            "配置应用优先级",
                            "重启ResourceManager使配置生效"
                        ],
                        configuration_changes={
                            "yarn.scheduler.capacity.root.*.capacity": "调整队列容量",
                            "yarn.scheduler.capacity.preemption.enable": "true",
                            "yarn.scheduler.capacity.root.*.user-limit-factor": "调整用户限制"
                        },
                        verification_steps=[
                            "检查队列配置是否生效",
                            "观察资源分配是否更均衡",
                            "验证抢占机制是否工作"
                        ],
                        prevention_measures=[
                            "定期审查队列配置",
                            "建立队列使用监控",
                            "制定队列管理策略"
                        ]
                    )
                ],
                related_cases=["case_002", "case_006"]
            ),
            
            # 应用失败问题
            TroubleshootingCase(
                id="case_002",
                fault_type=FaultType.APPLICATION_FAILURE,
                severity=SeverityLevel.MEDIUM,
                title="应用程序频繁失败重启",
                description="应用程序运行过程中频繁失败,导致重启次数过多",
                symptoms=[
                    Symptom(
                        id="sym_002",
                        description="应用状态频繁变为FAILED",
                        indicators=[
                            "应用重启次数超过阈值",
                            "容器退出码非零",
                            "应用日志中出现异常"
                        ],
                        log_patterns=[
                            ".*Application .* failed .* times.*",
                            ".*Container .* exited with exitCode.*",
                            ".*OutOfMemoryError.*",
                            ".*Exception.*"
                        ],
                        metrics_thresholds={
                            "application_failure_rate": 0.1,
                            "container_failure_rate": 0.05,
                            "restart_count": 3
                        }
                    )
                ],
                root_causes=[
                    "应用程序内存不足",
                    "应用程序逻辑错误",
                    "依赖服务不可用",
                    "资源配置不当",
                    "网络连接问题",
                    "数据质量问题"
                ],
                solutions=[
                    Solution(
                        id="sol_003",
                        title="调整应用资源配置",
                        description="根据应用实际需求调整内存和CPU配置",
                        steps=[
                            "分析应用资源使用模式",
                            "检查容器退出日志",
                            "增加应用内存配置",
                            "调整JVM堆内存设置",
                            "优化应用代码",
                            "重新提交应用测试"
                        ],
                        configuration_changes={
                            "mapreduce.map.memory.mb": "增加Map任务内存",
                            "mapreduce.reduce.memory.mb": "增加Reduce任务内存",
                            "mapreduce.map.java.opts": "调整JVM参数",
                            "mapreduce.reduce.java.opts": "调整JVM参数"
                        },
                        verification_steps=[
                            "监控应用内存使用情况",
                            "检查应用是否正常完成",
                            "验证性能是否改善"
                        ],
                        prevention_measures=[
                            "建立应用性能基线",
                            "实施资源使用监控",
                            "定期进行性能测试"
                        ]
                    )
                ],
                related_cases=["case_001", "case_006"]
            ),
            
            # 节点故障问题
            TroubleshootingCase(
                id="case_003",
                fault_type=FaultType.NODE_FAILURE,
                severity=SeverityLevel.CRITICAL,
                title="NodeManager节点失联",
                description="NodeManager节点无法与ResourceManager通信,导致节点不可用",
                symptoms=[
                    Symptom(
                        id="sym_003",
                        description="节点状态显示为LOST或UNHEALTHY",
                        indicators=[
                            "ResourceManager Web UI显示节点不健康",
                            "节点心跳超时",
                            "节点上的容器被杀死"
                        ],
                        log_patterns=[
                            ".*Node .* is LOST.*",
                            ".*Removing node.*",
                            ".*NodeManager .* failed to connect.*"
                        ],
                        metrics_thresholds={
                            "node_heartbeat_interval": 600,  # 10分钟
                            "unhealthy_nodes": 1
                        }
                    )
                ],
                root_causes=[
                    "NodeManager进程崩溃",
                    "网络连接问题",
                    "磁盘空间不足",
                    "系统资源耗尽",
                    "配置错误",
                    "硬件故障"
                ],
                solutions=[
                    Solution(
                        id="sol_004",
                        title="重启NodeManager服务",
                        description="重启失联的NodeManager服务恢复节点",
                        steps=[
                            "登录到失联节点",
                            "检查NodeManager进程状态",
                            "查看NodeManager日志",
                            "检查系统资源使用情况",
                            "重启NodeManager服务",
                            "验证节点重新加入集群"
                        ],
                        configuration_changes={},
                        verification_steps=[
                            "检查ResourceManager Web UI中节点状态",
                            "验证节点可以接受新任务",
                            "监控节点稳定性"
                        ],
                        prevention_measures=[
                            "建立节点健康监控",
                            "配置自动重启机制",
                            "定期检查系统资源"
                        ]
                    )
                ],
                related_cases=["case_004", "case_005"]
            ),
            
            # 性能问题
            TroubleshootingCase(
                id="case_004",
                fault_type=FaultType.PERFORMANCE_DEGRADATION,
                severity=SeverityLevel.MEDIUM,
                title="应用执行速度缓慢",
                description="应用程序执行时间比预期长,整体性能下降",
                symptoms=[
                    Symptom(
                        id="sym_004",
                        description="应用执行时间异常长",
                        indicators=[
                            "任务执行时间超过历史平均值",
                            "资源利用率低",
                            "存在慢任务"
                        ],
                        log_patterns=[
                            ".*Task .* is running slowly.*",
                            ".*Speculative execution.*"
                        ],
                        metrics_thresholds={
                            "task_execution_time": 3600,  # 1小时
                            "slow_tasks_ratio": 0.1
                        }
                    )
                ],
                root_causes=[
                    "数据倾斜",
                    "资源配置不当",
                    "网络瓶颈",
                    "磁盘I/O瓶颈",
                    "算法效率低",
                    "垃圾回收频繁"
                ],
                solutions=[
                    Solution(
                        id="sol_005",
                        title="启用推测执行",
                        description="启用推测执行机制处理慢任务",
                        steps=[
                            "分析慢任务分布",
                            "启用Map推测执行",
                            "启用Reduce推测执行",
                            "调整推测执行阈值",
                            "监控推测执行效果"
                        ],
                        configuration_changes={
                            "mapreduce.map.speculative": "true",
                            "mapreduce.reduce.speculative": "true",
                            "mapreduce.job.speculative.slowtaskthreshold": "0.8"
                        },
                        verification_steps=[
                            "检查推测执行是否生效",
                            "监控任务完成时间",
                            "验证整体性能改善"
                        ],
                        prevention_measures=[
                            "定期分析任务性能",
                            "优化数据分布",
                            "监控系统资源"
                        ]
                    )
                ],
                related_cases=["case_002", "case_006"]
            )
        ]
        
        return cases
    
    def diagnose_issue(self, symptoms: List[str], metrics: Dict[str, float], logs: List[str]) -> List[TroubleshootingCase]:
        """
        根据症状、指标和日志诊断问题
        
        Args:
            symptoms: 观察到的症状列表
            metrics: 相关指标值
            logs: 日志内容
            
        Returns:
            List[TroubleshootingCase]: 匹配的故障案例列表
        """
        matched_cases = []
        
        for case in self.cases:
            score = self._calculate_match_score(case, symptoms, metrics, logs)
            if score > 0.5:  # 匹配度阈值
                matched_cases.append((case, score))
        
        # 按匹配度排序
        matched_cases.sort(key=lambda x: x[1], reverse=True)
        
        return [case for case, score in matched_cases]
    
    def get_solution_steps(self, case_id: str, solution_id: str) -> Dict[str, Any]:
        """
        获取特定解决方案的详细步骤
        
        Args:
            case_id: 案例ID
            solution_id: 解决方案ID
            
        Returns:
            Dict[str, Any]: 解决方案详细信息
        """
        case = self._get_case_by_id(case_id)
        if not case:
            return {}
        
        solution = next((s for s in case.solutions if s.id == solution_id), None)
        if not solution:
            return {}
        
        return {
            'title': solution.title,
            'description': solution.description,
            'steps': solution.steps,
            'configuration_changes': solution.configuration_changes,
            'verification_steps': solution.verification_steps,
            'prevention_measures': solution.prevention_measures,
            'estimated_time': self._estimate_solution_time(solution),
            'risk_level': self._assess_solution_risk(solution)
        }
    
    def generate_troubleshooting_report(self, case_ids: List[str]) -> Dict[str, Any]:
        """
        生成故障排除报告
        
        Args:
            case_ids: 案例ID列表
            
        Returns:
            Dict[str, Any]: 故障排除报告
        """
        report = {
            'summary': {
                'total_cases': len(case_ids),
                'severity_distribution': {},
                'fault_type_distribution': {},
                'estimated_resolution_time': 0
            },
            'cases': [],
            'recommendations': [],
            'prevention_strategies': []
        }
        
        for case_id in case_ids:
            case = self._get_case_by_id(case_id)
            if not case:
                continue
            
            # 统计严重程度分布
            severity = case.severity.value
            report['summary']['severity_distribution'][severity] = \
                report['summary']['severity_distribution'].get(severity, 0) + 1
            
            # 统计故障类型分布
            fault_type = case.fault_type.value
            report['summary']['fault_type_distribution'][fault_type] = \
                report['summary']['fault_type_distribution'].get(fault_type, 0) + 1
            
            # 添加案例信息
            case_info = {
                'id': case.id,
                'title': case.title,
                'severity': case.severity.value,
                'fault_type': case.fault_type.value,
                'solutions_count': len(case.solutions),
                'recommended_solution': case.solutions[0].id if case.solutions else None
            }
            report['cases'].append(case_info)
        
        # 生成建议
        report['recommendations'] = self._generate_general_recommendations(case_ids)
        
        # 生成预防策略
        report['prevention_strategies'] = self._generate_prevention_strategies(case_ids)
        
        return report
    
    def _calculate_match_score(self, case: TroubleshootingCase, symptoms: List[str], 
                              metrics: Dict[str, float], logs: List[str]) -> float:
        """
        计算案例匹配度
        
        Args:
            case: 故障案例
            symptoms: 症状列表
            metrics: 指标值
            logs: 日志内容
            
        Returns:
            float: 匹配度分数 (0-1)
        """
        total_score = 0
        max_score = 0
        
        for symptom in case.symptoms:
            max_score += 1
            
            # 检查症状匹配
            symptom_match = any(indicator.lower() in ' '.join(symptoms).lower() 
                              for indicator in symptom.indicators)
            if symptom_match:
                total_score += 0.4
            
            # 检查指标阈值
            metrics_match = any(metrics.get(metric, 0) >= threshold 
                              for metric, threshold in symptom.metrics_thresholds.items())
            if metrics_match:
                total_score += 0.3
            
            # 检查日志模式
            log_content = ' '.join(logs)
            log_match = any(re.search(pattern, log_content) 
                          for pattern in symptom.log_patterns)
            if log_match:
                total_score += 0.3
        
        return total_score / max_score if max_score > 0 else 0
    
    def _get_case_by_id(self, case_id: str) -> Optional[TroubleshootingCase]:
        """
        根据ID获取案例
        
        Args:
            case_id: 案例ID
            
        Returns:
            Optional[TroubleshootingCase]: 案例对象
        """
        return next((case for case in self.cases if case.id == case_id), None)
    
    def _estimate_solution_time(self, solution: Solution) -> str:
        """
        估算解决方案执行时间
        
        Args:
            solution: 解决方案
            
        Returns:
            str: 估算时间
        """
        step_count = len(solution.steps)
        if step_count <= 3:
            return "30分钟"
        elif step_count <= 6:
            return "1-2小时"
        else:
            return "2-4小时"
    
    def _assess_solution_risk(self, solution: Solution) -> str:
        """
        评估解决方案风险级别
        
        Args:
            solution: 解决方案
            
        Returns:
            str: 风险级别
        """
        config_changes = len(solution.configuration_changes)
        if config_changes == 0:
            return "低"
        elif config_changes <= 3:
            return "中"
        else:
            return "高"
    
    def _generate_general_recommendations(self, case_ids: List[str]) -> List[Dict[str, str]]:
        """
        生成通用建议
        
        Args:
            case_ids: 案例ID列表
            
        Returns:
            List[Dict[str, str]]: 建议列表
        """
        recommendations = [
            {
                'title': '建立完善的监控体系',
                'description': '部署全面的YARN监控,及时发现和预防问题',
                'priority': 'HIGH'
            },
            {
                'title': '制定故障响应流程',
                'description': '建立标准化的故障处理流程,提高响应效率',
                'priority': 'MEDIUM'
            },
            {
                'title': '定期进行健康检查',
                'description': '定期检查集群健康状态,预防潜在问题',
                'priority': 'MEDIUM'
            }
        ]
        
        return recommendations
    
    def _generate_prevention_strategies(self, case_ids: List[str]) -> List[Dict[str, str]]:
        """
        生成预防策略
        
        Args:
            case_ids: 案例ID列表
            
        Returns:
            List[Dict[str, str]]: 预防策略列表
        """
        strategies = [
            {
                'category': '容量规划',
                'strategy': '基于历史数据和增长趋势进行容量规划',
                'implementation': '定期分析资源使用趋势,提前扩容'
            },
            {
                'category': '配置管理',
                'strategy': '建立配置版本控制和变更管理流程',
                'implementation': '使用配置管理工具,记录所有配置变更'
            },
            {
                'category': '性能优化',
                'strategy': '持续优化应用和集群性能',
                'implementation': '定期进行性能测试和调优'
            }
        ]
        
        return strategies

# 使用示例
if __name__ == "__main__":
    # 创建故障排除指南
    guide = YARNTroubleshootingGuide()
    
    print("=== YARN故障排除指南 ===")
    
    # 显示所有故障类型的案例数量
    print("\n各故障类型案例数量:")
    fault_type_counts = {}
    for case in guide.cases:
        fault_type = case.fault_type.value
        fault_type_counts[fault_type] = fault_type_counts.get(fault_type, 0) + 1
    
    for fault_type, count in fault_type_counts.items():
        print(f"  {fault_type}: {count} 个案例")
    
    # 模拟故障诊断
    symptoms = [
        "应用长时间处于ACCEPTED状态",
        "集群资源利用率接近100%",
        "队列中有大量等待的应用"
    ]
    
    metrics = {
        "cluster_memory_utilization": 0.98,
        "cluster_cpu_utilization": 0.96,
        "pending_applications": 15
    }
    
    logs = [
        "Application app_001 is added to the scheduler",
        "Cannot allocate container: insufficient resources"
    ]
    
    print("\n=== 故障诊断结果 ===")
    matched_cases = guide.diagnose_issue(symptoms, metrics, logs)
    
    if matched_cases:
        print(f"找到 {len(matched_cases)} 个匹配的故障案例:")
        for i, case in enumerate(matched_cases[:3], 1):  # 显示前3个
            print(f"\n{i}. [{case.id}] {case.title}")
            print(f"   故障类型: {case.fault_type.value}")
            print(f"   严重程度: {case.severity.value}")
            print(f"   描述: {case.description}")
            
            if case.solutions:
                print(f"   推荐解决方案: {case.solutions[0].title}")
                
                # 获取解决方案详细步骤
                solution_details = guide.get_solution_steps(case.id, case.solutions[0].id)
                if solution_details:
                    print(f"   预估时间: {solution_details['estimated_time']}")
                    print(f"   风险级别: {solution_details['risk_level']}")
                    print("   执行步骤:")
                    for step in solution_details['steps'][:3]:  # 显示前3步
                        print(f"     - {step}")
    else:
        print("未找到匹配的故障案例")
    
    # 生成故障排除报告
    case_ids = [case.id for case in matched_cases[:2]]  # 选择前2个案例
    if case_ids:
        print("\n=== 故障排除报告 ===")
        report = guide.generate_troubleshooting_report(case_ids)
        
        print(f"总案例数: {report['summary']['total_cases']}")
        
        print("\n严重程度分布:")
        for severity, count in report['summary']['severity_distribution'].items():
            print(f"  {severity}: {count}")
        
        print("\n故障类型分布:")
        for fault_type, count in report['summary']['fault_type_distribution'].items():
            print(f"  {fault_type}: {count}")
        
        print("\n通用建议:")
        for rec in report['recommendations']:
            print(f"  [{rec['priority']}] {rec['title']}")
            print(f"    {rec['description']}")
        
        print("\n预防策略:")
         for strategy in report['prevention_strategies']:
             print(f"  {strategy['category']}: {strategy['strategy']}")
             print(f"    实施方法: {strategy['implementation']}")

6. YARN总结

6.1 核心概念回顾

from typing import Dict, List, Any
from dataclasses import dataclass
from datetime import datetime

@dataclass
class YARNCoreConcept:
    """YARN核心概念"""
    name: str
    description: str
    key_features: List[str]
    benefits: List[str]
    use_cases: List[str]

class YARNSummaryGuide:
    """
    YARN总结指南
    """
    
    def __init__(self):
        self.core_concepts = self._initialize_core_concepts()
        self.key_advantages = self._initialize_key_advantages()
        self.best_practices = self._initialize_best_practices()
        self.future_trends = self._initialize_future_trends()
    
    def _initialize_core_concepts(self) -> List[YARNCoreConcept]:
        """
        初始化YARN核心概念
        
        Returns:
            List[YARNCoreConcept]: 核心概念列表
        """
        concepts = [
            YARNCoreConcept(
                name="ResourceManager",
                description="YARN集群的主节点,负责全局资源管理和应用调度",
                key_features=[
                    "全局资源管理",
                    "应用程序调度",
                    "集群监控",
                    "高可用支持",
                    "Web UI管理界面"
                ],
                benefits=[
                    "集中化资源管理",
                    "高效的资源分配",
                    "支持多种计算框架",
                    "提供统一的集群视图"
                ],
                use_cases=[
                    "大数据处理集群管理",
                    "多租户环境资源分配",
                    "批处理和流处理作业调度"
                ]
            ),
            YARNCoreConcept(
                name="NodeManager",
                description="运行在每个工作节点上,负责本地资源管理和容器生命周期",
                key_features=[
                    "本地资源监控",
                    "容器生命周期管理",
                    "心跳机制",
                    "日志聚合",
                    "健康检查"
                ],
                benefits=[
                    "细粒度资源控制",
                    "容器隔离",
                    "故障检测和恢复",
                    "资源使用监控"
                ],
                use_cases=[
                    "工作节点资源管理",
                    "应用容器运行环境",
                    "分布式计算任务执行"
                ]
            ),
            YARNCoreConcept(
                name="ApplicationMaster",
                description="每个应用程序的主控进程,负责应用的资源协商和任务管理",
                key_features=[
                    "资源请求和协商",
                    "任务调度和监控",
                    "故障处理",
                    "进度报告",
                    "应用生命周期管理"
                ],
                benefits=[
                    "应用级别的资源管理",
                    "灵活的任务调度",
                    "故障恢复能力",
                    "与ResourceManager解耦"
                ],
                use_cases=[
                    "MapReduce作业管理",
                    "Spark应用协调",
                    "自定义计算框架集成"
                ]
            ),
            YARNCoreConcept(
                name="Container",
                description="YARN中的基本计算单元,封装了CPU、内存等资源",
                key_features=[
                    "资源封装",
                    "进程隔离",
                    "生命周期管理",
                    "资源限制",
                    "安全控制"
                ],
                benefits=[
                    "资源隔离和保护",
                    "多租户支持",
                    "弹性资源分配",
                    "安全的执行环境"
                ],
                use_cases=[
                    "任务执行环境",
                    "应用进程容器",
                    "资源配额管理"
                ]
            ),
            YARNCoreConcept(
                name="调度器",
                description="负责将集群资源分配给各个应用程序的组件",
                key_features=[
                    "多种调度策略",
                    "队列管理",
                    "资源抢占",
                    "优先级支持",
                    "公平性保证"
                ],
                benefits=[
                    "灵活的资源分配策略",
                    "支持多种工作负载",
                    "提高集群利用率",
                    "保证服务质量"
                ],
                use_cases=[
                    "多队列资源管理",
                    "优先级作业调度",
                    "资源配额控制"
                ]
            )
        ]
        
        return concepts
    
    def _initialize_key_advantages(self) -> Dict[str, List[str]]:
        """
        初始化YARN关键优势
        
        Returns:
            Dict[str, List[str]]: 关键优势分类
        """
        advantages = {
            "架构优势": [
                "分离资源管理和作业调度",
                "支持多种计算框架",
                "更好的可扩展性",
                "提高集群利用率",
                "减少单点故障"
            ],
            "资源管理优势": [
                "细粒度资源分配",
                "动态资源调整",
                "多租户资源隔离",
                "资源配额管理",
                "弹性资源伸缩"
            ],
            "调度优势": [
                "多种调度策略",
                "优先级调度",
                "公平调度",
                "容量调度",
                "资源抢占机制"
            ],
            "运维优势": [
                "统一的集群管理",
                "丰富的监控指标",
                "Web UI管理界面",
                "日志聚合功能",
                "故障自动恢复"
            ],
            "开发优势": [
                "简化应用开发",
                "标准化API接口",
                "容器化执行环境",
                "灵活的资源请求",
                "易于集成新框架"
            ]
        }
        
        return advantages
    
    def _initialize_best_practices(self) -> Dict[str, List[Dict[str, str]]]:
        """
        初始化最佳实践
        
        Returns:
            Dict[str, List[Dict[str, str]]]: 最佳实践分类
        """
        practices = {
            "部署实践": [
                {
                    "practice": "合理规划集群规模",
                    "description": "根据业务需求和增长预期规划集群节点数量和配置",
                    "implementation": "进行容量规划,考虑峰值负载和扩展需求"
                },
                {
                    "practice": "配置高可用",
                    "description": "部署ResourceManager高可用,避免单点故障",
                    "implementation": "配置RM HA,使用ZooKeeper进行故障切换"
                },
                {
                    "practice": "网络优化",
                    "description": "优化网络配置,提高数据传输效率",
                    "implementation": "使用高速网络,优化网络拓扑结构"
                }
            ],
            "配置实践": [
                {
                    "practice": "合理配置资源",
                    "description": "根据硬件配置合理设置内存和CPU资源",
                    "implementation": "预留系统资源,避免资源过度分配"
                },
                {
                    "practice": "优化调度器配置",
                    "description": "根据业务特点选择和配置合适的调度器",
                    "implementation": "配置队列容量、用户限制和抢占策略"
                },
                {
                    "practice": "安全配置",
                    "description": "启用安全认证和授权机制",
                    "implementation": "配置Kerberos认证,设置访问控制列表"
                }
            ],
            "运维实践": [
                {
                    "practice": "建立监控体系",
                    "description": "部署全面的监控系统,及时发现问题",
                    "implementation": "使用Ganglia、Nagios等监控工具"
                },
                {
                    "practice": "定期健康检查",
                    "description": "定期检查集群健康状态和性能指标",
                    "implementation": "制定检查清单,自动化健康检查流程"
                },
                {
                    "practice": "日志管理",
                    "description": "集中管理和分析日志,便于故障排查",
                    "implementation": "配置日志聚合,使用ELK等日志分析工具"
                }
            ],
            "性能实践": [
                {
                    "practice": "资源调优",
                    "description": "根据应用特点调优资源配置",
                    "implementation": "分析应用资源使用模式,优化内存和CPU配置"
                },
                {
                    "practice": "数据本地化",
                    "description": "优化数据本地化,减少网络传输",
                    "implementation": "合理放置数据,启用机架感知"
                },
                {
                    "practice": "并发控制",
                    "description": "控制并发作业数量,避免资源竞争",
                    "implementation": "设置合理的队列容量和用户限制"
                }
            ]
        }
        
        return practices
    
    def _initialize_future_trends(self) -> List[Dict[str, str]]:
        """
        初始化未来发展趋势
        
        Returns:
            List[Dict[str, str]]: 发展趋势列表
        """
        trends = [
            {
                "trend": "云原生集成",
                "description": "与Kubernetes等云原生技术深度集成",
                "impact": "提供更好的容器化支持和云端部署能力",
                "timeline": "短期"
            },
            {
                "trend": "GPU资源支持",
                "description": "原生支持GPU等异构计算资源",
                "impact": "更好地支持机器学习和AI工作负载",
                "timeline": "中期"
            },
            {
                "trend": "智能调度",
                "description": "基于机器学习的智能资源调度",
                "impact": "提高资源利用率和作业性能",
                "timeline": "中期"
            },
            {
                "trend": "边缘计算支持",
                "description": "支持边缘计算场景的资源管理",
                "impact": "扩展到边缘计算和IoT场景",
                "timeline": "长期"
            },
            {
                "trend": "多云管理",
                "description": "跨多个云平台的统一资源管理",
                "impact": "提供混合云和多云环境的统一管理",
                "timeline": "长期"
            },
            {
                "trend": "实时计算优化",
                "description": "针对实时计算工作负载的优化",
                "impact": "更好地支持流处理和实时分析",
                "timeline": "短期"
            }
        ]
        
        return trends
    
    def get_concept_summary(self, concept_name: str = None) -> Dict[str, Any]:
        """
        获取概念总结
        
        Args:
            concept_name: 概念名称,如果为None则返回所有概念
            
        Returns:
            Dict[str, Any]: 概念总结
        """
        if concept_name:
            concept = next((c for c in self.core_concepts if c.name == concept_name), None)
            if concept:
                return {
                    'name': concept.name,
                    'description': concept.description,
                    'key_features': concept.key_features,
                    'benefits': concept.benefits,
                    'use_cases': concept.use_cases
                }
            return {}
        
        return {
            'total_concepts': len(self.core_concepts),
            'concepts': [
                {
                    'name': concept.name,
                    'description': concept.description,
                    'features_count': len(concept.key_features),
                    'benefits_count': len(concept.benefits)
                }
                for concept in self.core_concepts
            ]
        }
    
    def get_advantages_summary(self) -> Dict[str, Any]:
        """
        获取优势总结
        
        Returns:
            Dict[str, Any]: 优势总结
        """
        return {
            'categories': list(self.key_advantages.keys()),
            'total_advantages': sum(len(advantages) for advantages in self.key_advantages.values()),
            'advantages_by_category': self.key_advantages,
            'top_advantages': [
                "支持多种计算框架",
                "细粒度资源分配",
                "提高集群利用率",
                "多租户资源隔离",
                "统一的集群管理"
            ]
        }
    
    def get_best_practices_summary(self, category: str = None) -> Dict[str, Any]:
        """
        获取最佳实践总结
        
        Args:
            category: 实践类别
            
        Returns:
            Dict[str, Any]: 最佳实践总结
        """
        if category and category in self.best_practices:
            return {
                'category': category,
                'practices': self.best_practices[category]
            }
        
        return {
            'categories': list(self.best_practices.keys()),
            'total_practices': sum(len(practices) for practices in self.best_practices.values()),
            'practices_by_category': {
                category: len(practices) 
                for category, practices in self.best_practices.items()
            },
            'key_practices': [
                "合理规划集群规模",
                "配置高可用",
                "建立监控体系",
                "资源调优",
                "安全配置"
            ]
        }
    
    def get_future_trends_summary(self) -> Dict[str, Any]:
        """
        获取未来趋势总结
        
        Returns:
            Dict[str, Any]: 未来趋势总结
        """
        timeline_distribution = {}
        for trend in self.future_trends:
            timeline = trend['timeline']
            timeline_distribution[timeline] = timeline_distribution.get(timeline, 0) + 1
        
        return {
            'total_trends': len(self.future_trends),
            'timeline_distribution': timeline_distribution,
            'trends': self.future_trends,
            'key_trends': [
                "云原生集成",
                "GPU资源支持",
                "智能调度",
                "多云管理"
            ]
        }
    
    def generate_comprehensive_summary(self) -> Dict[str, Any]:
        """
        生成综合总结报告
        
        Returns:
            Dict[str, Any]: 综合总结报告
        """
        return {
            'overview': {
                'title': 'Apache YARN 资源管理系统',
                'description': 'YARN是Hadoop 2.0的核心组件,提供了通用的资源管理和作业调度平台',
                'key_value': '将资源管理与作业调度分离,支持多种计算框架在同一集群上运行'
            },
            'core_concepts': self.get_concept_summary(),
            'key_advantages': self.get_advantages_summary(),
            'best_practices': self.get_best_practices_summary(),
            'future_trends': self.get_future_trends_summary(),
            'conclusion': {
                'strengths': [
                    "统一的资源管理平台",
                    "支持多种计算框架",
                    "高可扩展性和可靠性",
                    "丰富的调度策略",
                    "完善的监控和管理功能"
                ],
                'use_cases': [
                    "大数据批处理",
                    "实时流处理",
                    "机器学习训练",
                    "多租户数据分析",
                    "混合工作负载管理"
                ],
                'recommendations': [
                    "根据业务需求选择合适的调度器",
                    "建立完善的监控和告警体系",
                    "定期进行性能调优和容量规划",
                    "关注新特性和发展趋势",
                    "建立标准化的运维流程"
                ]
            }
        }

# 使用示例
if __name__ == "__main__":
    # 创建YARN总结指南
    summary_guide = YARNSummaryGuide()
    
    print("=== YARN 总结指南 ===")
    
    # 获取核心概念总结
    print("\n=== 核心概念总结 ===")
    concepts_summary = summary_guide.get_concept_summary()
    print(f"总概念数: {concepts_summary['total_concepts']}")
    
    for concept in concepts_summary['concepts']:
        print(f"\n{concept['name']}:")
        print(f"  描述: {concept['description']}")
        print(f"  特性数量: {concept['features_count']}")
        print(f"  优势数量: {concept['benefits_count']}")
    
    # 获取关键优势总结
    print("\n=== 关键优势总结 ===")
    advantages_summary = summary_guide.get_advantages_summary()
    print(f"优势类别: {len(advantages_summary['categories'])}")
    print(f"总优势数: {advantages_summary['total_advantages']}")
    
    print("\n各类别优势数量:")
    for category, advantages in advantages_summary['advantages_by_category'].items():
        print(f"  {category}: {len(advantages)} 个")
    
    print("\n核心优势:")
    for advantage in advantages_summary['top_advantages']:
        print(f"  - {advantage}")
    
    # 获取最佳实践总结
    print("\n=== 最佳实践总结 ===")
    practices_summary = summary_guide.get_best_practices_summary()
    print(f"实践类别: {len(practices_summary['categories'])}")
    print(f"总实践数: {practices_summary['total_practices']}")
    
    print("\n各类别实践数量:")
    for category, count in practices_summary['practices_by_category'].items():
        print(f"  {category}: {count} 个")
    
    print("\n关键实践:")
    for practice in practices_summary['key_practices']:
        print(f"  - {practice}")
    
    # 获取未来趋势总结
    print("\n=== 未来发展趋势 ===")
    trends_summary = summary_guide.get_future_trends_summary()
    print(f"总趋势数: {trends_summary['total_trends']}")
    
    print("\n时间线分布:")
    for timeline, count in trends_summary['timeline_distribution'].items():
        print(f"  {timeline}: {count} 个趋势")
    
    print("\n关键趋势:")
    for trend in trends_summary['key_trends']:
        print(f"  - {trend}")
    
    # 生成综合总结报告
    print("\n=== 综合总结报告 ===")
    comprehensive_summary = summary_guide.generate_comprehensive_summary()
    
    print(f"\n{comprehensive_summary['overview']['title']}")
    print(f"描述: {comprehensive_summary['overview']['description']}")
    print(f"核心价值: {comprehensive_summary['overview']['key_value']}")
    
    print("\n核心优势:")
    for strength in comprehensive_summary['conclusion']['strengths']:
        print(f"  - {strength}")
    
    print("\n适用场景:")
    for use_case in comprehensive_summary['conclusion']['use_cases']:
        print(f"  - {use_case}")
    
    print("\n建议:")
    for recommendation in comprehensive_summary['conclusion']['recommendations']:
        print(f"  - {recommendation}")
    
    print("\n=== YARN学习总结 ===")
    print("通过本章学习,您应该掌握:")
    print("1. YARN的核心架构和组件")
    print("2. 资源管理和调度机制")
    print("3. 应用程序生命周期管理")
    print("4. 性能监控和优化策略")
    print("5. 故障排除和最佳实践")
    print("\nYARN作为Hadoop生态系统的核心组件,为大数据处理提供了强大的资源管理能力。")
    print("掌握YARN的使用和管理,对于构建高效、可靠的大数据平台至关重要。")