3.1 HDFS概述

3.1.1 HDFS简介

HDFS(Hadoop Distributed File System)是Apache Hadoop的核心组件之一,是一个分布式文件系统,专为在商用硬件集群上存储大文件而设计。HDFS具有高容错性、高吞吐量和可扩展性等特点。

3.1.2 HDFS设计目标

  1. 硬件故障处理:假设硬件故障是常态而非异常
  2. 流式数据访问:为批处理而非交互式应用设计
  3. 大数据集:支持TB到PB级别的数据存储
  4. 简单一致性模型:一次写入,多次读取的访问模式
  5. 移动计算而非数据:将计算移动到数据所在位置
  6. 跨异构硬件和软件平台的可移植性

3.1.3 HDFS特点

class HDFSCharacteristics:
    """
    HDFS特点分析器
    """
    
    def __init__(self):
        self.characteristics = {
            "高容错性": {
                "描述": "通过数据副本机制保证数据可靠性",
                "实现方式": ["数据副本", "心跳检测", "自动故障恢复"],
                "优势": "即使部分节点故障,系统仍能正常运行"
            },
            "高吞吐量": {
                "描述": "优化大文件的顺序读写性能",
                "实现方式": ["大块存储", "流式访问", "并行处理"],
                "优势": "适合大数据批处理应用"
            },
            "可扩展性": {
                "描述": "支持水平扩展到数千个节点",
                "实现方式": ["分布式架构", "动态添加节点", "负载均衡"],
                "优势": "随业务增长灵活扩展存储容量"
            },
            "成本效益": {
                "描述": "运行在商用硬件上,降低存储成本",
                "实现方式": ["商用硬件", "开源软件", "线性扩展"],
                "优势": "相比传统存储系统成本更低"
            }
        }
    
    def analyze_characteristic(self, char_name: str) -> dict:
        """
        分析特定特点
        
        Args:
            char_name: 特点名称
            
        Returns:
            dict: 特点详细信息
        """
        return self.characteristics.get(char_name, {})
    
    def compare_with_traditional_fs(self) -> dict:
        """
        与传统文件系统对比
        
        Returns:
            dict: 对比结果
        """
        comparison = {
            "存储容量": {
                "传统文件系统": "单机存储,容量受限",
                "HDFS": "分布式存储,容量可扩展到PB级"
            },
            "容错性": {
                "传统文件系统": "依赖RAID等硬件容错",
                "HDFS": "软件层面实现数据副本容错"
            },
            "访问模式": {
                "传统文件系统": "支持随机读写",
                "HDFS": "优化顺序读写,不支持随机写"
            },
            "成本": {
                "传统文件系统": "需要高端存储设备",
                "HDFS": "使用商用硬件,成本较低"
            },
            "扩展性": {
                "传统文件系统": "垂直扩展,成本高",
                "HDFS": "水平扩展,成本线性增长"
            }
        }
        return comparison
    
    def get_use_cases(self) -> dict:
        """
        获取适用场景
        
        Returns:
            dict: 适用场景信息
        """
        use_cases = {
            "适合场景": [
                "大数据批处理",
                "数据仓库",
                "日志文件存储",
                "备份和归档",
                "科学计算数据",
                "机器学习训练数据"
            ],
            "不适合场景": [
                "低延迟数据访问",
                "大量小文件存储",
                "频繁的随机写操作",
                "实时数据处理",
                "需要POSIX语义的应用"
            ]
        }
        return use_cases

# 使用示例
if __name__ == "__main__":
    analyzer = HDFSCharacteristics()
    
    print("=== HDFS特点分析 ===")
    for char_name in analyzer.characteristics:
        char_info = analyzer.analyze_characteristic(char_name)
        print(f"\n{char_name}:")
        print(f"  描述: {char_info['描述']}")
        print(f"  实现方式: {', '.join(char_info['实现方式'])}")
        print(f"  优势: {char_info['优势']}")
    
    print("\n=== 与传统文件系统对比 ===")
    comparison = analyzer.compare_with_traditional_fs()
    for aspect, details in comparison.items():
        print(f"\n{aspect}:")
        print(f"  传统文件系统: {details['传统文件系统']}")
        print(f"  HDFS: {details['HDFS']}")
    
    print("\n=== 适用场景 ===")
    use_cases = analyzer.get_use_cases()
    print("适合场景:")
    for case in use_cases['适合场景']:
        print(f"  - {case}")
    
    print("\n不适合场景:")
    for case in use_cases['不适合场景']:
        print(f"  - {case}")

3.2 HDFS架构

3.2.1 主从架构

HDFS采用主从(Master/Slave)架构,主要包含以下组件:

  1. NameNode(主节点):管理文件系统的命名空间和元数据
  2. DataNode(从节点):存储实际的数据块
  3. Secondary NameNode:辅助NameNode进行元数据备份
  4. Client:文件系统客户端

3.2.2 架构组件详解

import json
import time
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from enum import Enum

class NodeStatus(Enum):
    """节点状态枚举"""
    ACTIVE = "active"
    STANDBY = "standby"
    DEAD = "dead"
    DECOMMISSIONING = "decommissioning"

@dataclass
class DataBlock:
    """数据块信息"""
    block_id: int
    size: int
    checksum: str
    locations: List[str]  # DataNode位置
    replication_factor: int
    timestamp: float

@dataclass
class FileMetadata:
    """文件元数据"""
    path: str
    size: int
    block_size: int
    replication_factor: int
    permissions: str
    owner: str
    group: str
    modification_time: float
    access_time: float
    blocks: List[DataBlock]

class NameNode:
    """
    NameNode模拟器
    管理HDFS的命名空间和元数据
    """
    
    def __init__(self, node_id: str):
        self.node_id = node_id
        self.status = NodeStatus.ACTIVE
        self.namespace = {}  # 文件系统命名空间
        self.block_locations = {}  # 块位置映射
        self.datanode_info = {}  # DataNode信息
        self.next_block_id = 1
        self.safe_mode = True
        
    def create_file(self, file_path: str, block_size: int = 134217728, 
                   replication_factor: int = 3) -> FileMetadata:
        """
        创建文件元数据
        
        Args:
            file_path: 文件路径
            block_size: 块大小(默认128MB)
            replication_factor: 副本数
            
        Returns:
            FileMetadata: 文件元数据
        """
        if file_path in self.namespace:
            raise ValueError(f"文件已存在: {file_path}")
        
        metadata = FileMetadata(
            path=file_path,
            size=0,
            block_size=block_size,
            replication_factor=replication_factor,
            permissions="644",
            owner="hadoop",
            group="hadoop",
            modification_time=time.time(),
            access_time=time.time(),
            blocks=[]
        )
        
        self.namespace[file_path] = metadata
        return metadata
    
    def allocate_block(self, file_path: str) -> DataBlock:
        """
        为文件分配新的数据块
        
        Args:
            file_path: 文件路径
            
        Returns:
            DataBlock: 分配的数据块
        """
        if file_path not in self.namespace:
            raise ValueError(f"文件不存在: {file_path}")
        
        metadata = self.namespace[file_path]
        
        # 选择DataNode存储位置
        available_datanodes = [
            node_id for node_id, info in self.datanode_info.items()
            if info['status'] == NodeStatus.ACTIVE
        ]
        
        if len(available_datanodes) < metadata.replication_factor:
            raise RuntimeError("可用DataNode数量不足")
        
        # 简单的副本放置策略
        selected_nodes = available_datanodes[:metadata.replication_factor]
        
        block = DataBlock(
            block_id=self.next_block_id,
            size=0,
            checksum="",
            locations=selected_nodes,
            replication_factor=metadata.replication_factor,
            timestamp=time.time()
        )
        
        self.next_block_id += 1
        self.block_locations[block.block_id] = block
        metadata.blocks.append(block)
        
        return block
    
    def get_block_locations(self, file_path: str) -> List[DataBlock]:
        """
        获取文件的块位置信息
        
        Args:
            file_path: 文件路径
            
        Returns:
            List[DataBlock]: 块位置列表
        """
        if file_path not in self.namespace:
            raise ValueError(f"文件不存在: {file_path}")
        
        return self.namespace[file_path].blocks
    
    def register_datanode(self, node_id: str, capacity: int, used: int) -> bool:
        """
        注册DataNode
        
        Args:
            node_id: 节点ID
            capacity: 总容量
            used: 已使用容量
            
        Returns:
            bool: 注册是否成功
        """
        self.datanode_info[node_id] = {
            'status': NodeStatus.ACTIVE,
            'capacity': capacity,
            'used': used,
            'last_heartbeat': time.time(),
            'blocks': set()
        }
        
        print(f"DataNode {node_id} 注册成功")
        return True
    
    def heartbeat(self, node_id: str, used: int, blocks: List[int]) -> dict:
        """
        处理DataNode心跳
        
        Args:
            node_id: 节点ID
            used: 已使用容量
            blocks: 节点上的块列表
            
        Returns:
            dict: 心跳响应
        """
        if node_id not in self.datanode_info:
            return {'status': 'error', 'message': '节点未注册'}
        
        node_info = self.datanode_info[node_id]
        node_info['used'] = used
        node_info['last_heartbeat'] = time.time()
        node_info['blocks'] = set(blocks)
        
        # 检查是否需要复制或删除块
        commands = []
        
        # 简化的块管理逻辑
        for block_id in blocks:
            if block_id in self.block_locations:
                block = self.block_locations[block_id]
                current_replicas = len([n for n in block.locations 
                                      if n in self.datanode_info and 
                                      self.datanode_info[n]['status'] == NodeStatus.ACTIVE])
                
                if current_replicas < block.replication_factor:
                    commands.append({
                        'type': 'replicate',
                        'block_id': block_id,
                        'target_nodes': self._select_replication_targets(block)
                    })
        
        return {
            'status': 'success',
            'commands': commands
        }
    
    def _select_replication_targets(self, block: DataBlock) -> List[str]:
        """
        选择副本复制目标节点
        
        Args:
            block: 数据块
            
        Returns:
            List[str]: 目标节点列表
        """
        available_nodes = [
            node_id for node_id, info in self.datanode_info.items()
            if (info['status'] == NodeStatus.ACTIVE and 
                node_id not in block.locations)
        ]
        
        needed_replicas = block.replication_factor - len(block.locations)
        return available_nodes[:needed_replicas]
    
    def get_cluster_status(self) -> dict:
        """
        获取集群状态
        
        Returns:
            dict: 集群状态信息
        """
        total_capacity = sum(info['capacity'] for info in self.datanode_info.values())
        total_used = sum(info['used'] for info in self.datanode_info.values())
        
        active_nodes = sum(1 for info in self.datanode_info.values() 
                          if info['status'] == NodeStatus.ACTIVE)
        
        return {
            'namenode_id': self.node_id,
            'status': self.status.value,
            'safe_mode': self.safe_mode,
            'total_files': len(self.namespace),
            'total_blocks': len(self.block_locations),
            'total_capacity': total_capacity,
            'total_used': total_used,
            'capacity_utilization': (total_used / total_capacity * 100) if total_capacity > 0 else 0,
            'active_datanodes': active_nodes,
            'total_datanodes': len(self.datanode_info)
        }

class DataNode:
    """
    DataNode模拟器
    存储实际的数据块
    """
    
    def __init__(self, node_id: str, capacity: int):
        self.node_id = node_id
        self.capacity = capacity
        self.used = 0
        self.status = NodeStatus.ACTIVE
        self.blocks = {}  # 存储的数据块
        self.namenode_address = None
        
    def store_block(self, block: DataBlock, data: bytes) -> bool:
        """
        存储数据块
        
        Args:
            block: 数据块信息
            data: 数据内容
            
        Returns:
            bool: 存储是否成功
        """
        if self.used + len(data) > self.capacity:
            return False
        
        self.blocks[block.block_id] = {
            'block': block,
            'data': data,
            'checksum': self._calculate_checksum(data)
        }
        
        self.used += len(data)
        return True
    
    def read_block(self, block_id: int) -> Optional[bytes]:
        """
        读取数据块
        
        Args:
            block_id: 块ID
            
        Returns:
            Optional[bytes]: 数据内容
        """
        if block_id in self.blocks:
            return self.blocks[block_id]['data']
        return None
    
    def delete_block(self, block_id: int) -> bool:
        """
        删除数据块
        
        Args:
            block_id: 块ID
            
        Returns:
            bool: 删除是否成功
        """
        if block_id in self.blocks:
            block_info = self.blocks[block_id]
            self.used -= len(block_info['data'])
            del self.blocks[block_id]
            return True
        return False
    
    def _calculate_checksum(self, data: bytes) -> str:
        """
        计算数据校验和
        
        Args:
            data: 数据内容
            
        Returns:
            str: 校验和
        """
        import hashlib
        return hashlib.md5(data).hexdigest()
    
    def get_block_report(self) -> List[int]:
        """
        获取块报告
        
        Returns:
            List[int]: 块ID列表
        """
        return list(self.blocks.keys())
    
    def get_node_status(self) -> dict:
        """
        获取节点状态
        
        Returns:
            dict: 节点状态信息
        """
        return {
            'node_id': self.node_id,
            'status': self.status.value,
            'capacity': self.capacity,
            'used': self.used,
            'available': self.capacity - self.used,
            'utilization': (self.used / self.capacity * 100) if self.capacity > 0 else 0,
            'block_count': len(self.blocks)
        }

# 使用示例
if __name__ == "__main__":
    # 创建NameNode
    namenode = NameNode("nn1")
    
    # 创建DataNode
    datanode1 = DataNode("dn1", 1000000000)  # 1GB
    datanode2 = DataNode("dn2", 1000000000)  # 1GB
    datanode3 = DataNode("dn3", 1000000000)  # 1GB
    
    # 注册DataNode
    namenode.register_datanode("dn1", 1000000000, 0)
    namenode.register_datanode("dn2", 1000000000, 0)
    namenode.register_datanode("dn3", 1000000000, 0)
    
    # 创建文件
    file_metadata = namenode.create_file("/user/data/test.txt")
    print(f"创建文件: {file_metadata.path}")
    
    # 分配数据块
    block = namenode.allocate_block("/user/data/test.txt")
    print(f"分配数据块: {block.block_id}, 位置: {block.locations}")
    
    # 获取集群状态
    cluster_status = namenode.get_cluster_status()
    print("\n=== 集群状态 ===")
    for key, value in cluster_status.items():
        print(f"{key}: {value}")

3.3 HDFS数据读写流程

3.3.1 数据写入流程

HDFS的数据写入采用流水线方式,确保数据的可靠性和一致性。

import threading
import time
from typing import List, Tuple
from queue import Queue

class HDFSWriteClient:
    """
    HDFS写入客户端模拟器
    """
    
    def __init__(self, namenode: NameNode):
        self.namenode = namenode
        self.current_block = None
        self.pipeline_nodes = []
        self.bytes_written = 0
        
    def create_file(self, file_path: str, data: bytes, 
                   block_size: int = 134217728) -> bool:
        """
        创建文件并写入数据
        
        Args:
            file_path: 文件路径
            data: 要写入的数据
            block_size: 块大小
            
        Returns:
            bool: 写入是否成功
        """
        try:
            # 1. 向NameNode请求创建文件
            print(f"步骤1: 向NameNode请求创建文件 {file_path}")
            metadata = self.namenode.create_file(file_path, block_size)
            
            # 2. 分块写入数据
            data_offset = 0
            block_index = 0
            
            while data_offset < len(data):
                # 计算当前块的数据大小
                remaining_data = len(data) - data_offset
                current_block_size = min(block_size, remaining_data)
                block_data = data[data_offset:data_offset + current_block_size]
                
                # 写入当前块
                success = self._write_block(file_path, block_data, block_index)
                if not success:
                    print(f"块 {block_index} 写入失败")
                    return False
                
                data_offset += current_block_size
                block_index += 1
            
            print(f"文件 {file_path} 写入完成,总大小: {len(data)} 字节")
            return True
            
        except Exception as e:
            print(f"文件写入失败: {e}")
            return False
    
    def _write_block(self, file_path: str, block_data: bytes, 
                    block_index: int) -> bool:
        """
        写入单个数据块
        
        Args:
            file_path: 文件路径
            block_data: 块数据
            block_index: 块索引
            
        Returns:
            bool: 写入是否成功
        """
        try:
            # 1. 向NameNode申请数据块
            print(f"  步骤2.{block_index + 1}.1: 向NameNode申请数据块")
            block = self.namenode.allocate_block(file_path)
            
            # 2. 建立数据流水线
            print(f"  步骤2.{block_index + 1}.2: 建立到DataNode的流水线")
            pipeline_success = self._establish_pipeline(block.locations)
            if not pipeline_success:
                return False
            
            # 3. 流水线写入数据
            print(f"  步骤2.{block_index + 1}.3: 流水线写入数据到 {block.locations}")
            write_success = self._pipeline_write(block, block_data)
            if not write_success:
                return False
            
            # 4. 确认写入完成
            print(f"  步骤2.{block_index + 1}.4: 确认数据块写入完成")
            return self._confirm_write(block)
            
        except Exception as e:
            print(f"  数据块写入失败: {e}")
            return False
    
    def _establish_pipeline(self, datanode_locations: List[str]) -> bool:
        """
        建立DataNode流水线
        
        Args:
            datanode_locations: DataNode位置列表
            
        Returns:
            bool: 流水线建立是否成功
        """
        self.pipeline_nodes = datanode_locations.copy()
        
        # 模拟建立连接
        for i, node in enumerate(self.pipeline_nodes):
            print(f"    连接到DataNode {node} (副本 {i + 1})")
            time.sleep(0.1)  # 模拟网络延迟
        
        return True
    
    def _pipeline_write(self, block: DataBlock, data: bytes) -> bool:
        """
        流水线写入数据
        
        Args:
            block: 数据块信息
            data: 数据内容
            
        Returns:
            bool: 写入是否成功
        """
        # 模拟数据包分割和流水线传输
        packet_size = 64 * 1024  # 64KB数据包
        packets = []
        
        for i in range(0, len(data), packet_size):
            packet_data = data[i:i + packet_size]
            packets.append({
                'sequence': i // packet_size,
                'data': packet_data,
                'checksum': self._calculate_checksum(packet_data)
            })
        
        # 模拟流水线传输
        for packet in packets:
            print(f"    发送数据包 {packet['sequence']}, 大小: {len(packet['data'])} 字节")
            
            # 模拟传输到第一个DataNode,然后流水线传输到其他节点
            for i, node in enumerate(self.pipeline_nodes):
                if i == 0:
                    print(f"      -> {node} (主副本)")
                else:
                    print(f"      -> {node} (副本 {i + 1})")
                time.sleep(0.05)  # 模拟传输延迟
        
        return True
    
    def _confirm_write(self, block: DataBlock) -> bool:
        """
        确认写入完成
        
        Args:
            block: 数据块信息
            
        Returns:
            bool: 确认是否成功
        """
        # 模拟从所有副本收到确认
        confirmations = []
        
        for node in self.pipeline_nodes:
            confirmation = {
                'node': node,
                'block_id': block.block_id,
                'status': 'success',
                'checksum': 'mock_checksum'
            }
            confirmations.append(confirmation)
            print(f"    收到来自 {node} 的写入确认")
        
        # 检查是否所有副本都确认成功
        success_count = sum(1 for conf in confirmations if conf['status'] == 'success')
        
        if success_count >= len(self.pipeline_nodes):
            print(f"    数据块 {block.block_id} 写入成功,{success_count} 个副本")
            return True
        else:
            print(f"    数据块 {block.block_id} 写入失败,只有 {success_count} 个副本成功")
            return False
    
    def _calculate_checksum(self, data: bytes) -> str:
        """
        计算校验和
        
        Args:
            data: 数据内容
            
        Returns:
            str: 校验和
        """
        import hashlib
        return hashlib.md5(data).hexdigest()

class HDFSReadClient:
    """
    HDFS读取客户端模拟器
    """
    
    def __init__(self, namenode: NameNode):
        self.namenode = namenode
        
    def read_file(self, file_path: str) -> bytes:
        """
        读取文件数据
        
        Args:
            file_path: 文件路径
            
        Returns:
            bytes: 文件数据
        """
        try:
            # 1. 向NameNode获取文件元数据
            print(f"步骤1: 向NameNode获取文件 {file_path} 的元数据")
            blocks = self.namenode.get_block_locations(file_path)
            
            if not blocks:
                print(f"文件 {file_path} 不存在或为空")
                return b''
            
            # 2. 按顺序读取所有数据块
            file_data = b''
            
            for i, block in enumerate(blocks):
                print(f"步骤2.{i + 1}: 读取数据块 {block.block_id}")
                block_data = self._read_block(block)
                
                if block_data is None:
                    print(f"数据块 {block.block_id} 读取失败")
                    return b''
                
                file_data += block_data
            
            print(f"文件 {file_path} 读取完成,总大小: {len(file_data)} 字节")
            return file_data
            
        except Exception as e:
            print(f"文件读取失败: {e}")
            return b''
    
    def _read_block(self, block: DataBlock) -> bytes:
        """
        读取单个数据块
        
        Args:
            block: 数据块信息
            
        Returns:
            bytes: 块数据
        """
        # 选择最近的DataNode进行读取
        selected_node = self._select_datanode(block.locations)
        
        if not selected_node:
            print(f"  无可用的DataNode读取块 {block.block_id}")
            return None
        
        print(f"  从DataNode {selected_node} 读取块 {block.block_id}")
        
        # 模拟数据读取
        try:
            # 这里应该是实际的网络读取操作
            # 为了演示,我们返回模拟数据
            mock_data = f"Block {block.block_id} data from {selected_node}".encode()
            
            # 验证校验和
            if self._verify_checksum(mock_data, block.checksum):
                print(f"  数据块 {block.block_id} 校验成功")
                return mock_data
            else:
                print(f"  数据块 {block.block_id} 校验失败,尝试其他副本")
                return self._read_block_from_replica(block, selected_node)
                
        except Exception as e:
            print(f"  从 {selected_node} 读取失败: {e},尝试其他副本")
            return self._read_block_from_replica(block, selected_node)
    
    def _select_datanode(self, locations: List[str]) -> str:
        """
        选择最优的DataNode进行读取
        
        Args:
            locations: DataNode位置列表
            
        Returns:
            str: 选中的DataNode
        """
        # 简单策略:选择第一个可用的DataNode
        # 实际实现中会考虑网络拓扑、负载等因素
        
        for node in locations:
            if self._is_datanode_available(node):
                return node
        
        return None
    
    def _is_datanode_available(self, node_id: str) -> bool:
        """
        检查DataNode是否可用
        
        Args:
            node_id: 节点ID
            
        Returns:
            bool: 是否可用
        """
        # 模拟可用性检查
        return node_id in self.namenode.datanode_info
    
    def _read_block_from_replica(self, block: DataBlock, 
                               failed_node: str) -> bytes:
        """
        从其他副本读取数据块
        
        Args:
            block: 数据块信息
            failed_node: 失败的节点
            
        Returns:
            bytes: 块数据
        """
        remaining_locations = [node for node in block.locations 
                             if node != failed_node]
        
        for node in remaining_locations:
            if self._is_datanode_available(node):
                print(f"  尝试从副本 {node} 读取块 {block.block_id}")
                try:
                    mock_data = f"Block {block.block_id} data from {node}".encode()
                    if self._verify_checksum(mock_data, block.checksum):
                        print(f"  从副本 {node} 读取成功")
                        return mock_data
                except Exception as e:
                    print(f"  从副本 {node} 读取失败: {e}")
                    continue
        
        print(f"  所有副本都无法读取块 {block.block_id}")
        return None
    
    def _verify_checksum(self, data: bytes, expected_checksum: str) -> bool:
        """
        验证数据校验和
        
        Args:
            data: 数据内容
            expected_checksum: 期望的校验和
            
        Returns:
            bool: 校验是否通过
        """
        # 模拟校验和验证
        return True  # 简化实现

# 使用示例
if __name__ == "__main__":
    # 创建NameNode和DataNode
    namenode = NameNode("nn1")
    
    # 注册DataNode
    namenode.register_datanode("dn1", 1000000000, 0)
    namenode.register_datanode("dn2", 1000000000, 0)
    namenode.register_datanode("dn3", 1000000000, 0)
    
    # 创建写入客户端
    write_client = HDFSWriteClient(namenode)
    
    # 写入文件
    test_data = b"This is a test file for HDFS write operation. " * 1000
    print("=== 开始写入文件 ===")
    success = write_client.create_file("/user/test/large_file.txt", test_data)
    
    if success:
        print("\n=== 开始读取文件 ===")
        # 创建读取客户端
        read_client = HDFSReadClient(namenode)
        
        # 读取文件
        read_data = read_client.read_file("/user/test/large_file.txt")
        
        if read_data:
            print(f"读取成功,数据长度: {len(read_data)}")
            print(f"数据匹配: {read_data == test_data}")

3.3.2 数据读取流程

数据读取流程相对简单,主要包括以下步骤:

  1. 客户端向NameNode请求文件元数据
  2. NameNode返回文件的块位置信息
  3. 客户端直接从DataNode读取数据块
  4. 客户端验证数据完整性
  5. 客户端组装完整文件数据

3.4 HDFS命令行操作

3.4.1 基本命令

HDFS提供了丰富的命令行工具,用于文件系统操作:

#!/bin/bash
# HDFS常用命令示例

# 1. 文件系统基本操作
echo "=== 文件系统基本操作 ==="

# 查看HDFS根目录
hdfs dfs -ls /

# 创建目录
hdfs dfs -mkdir -p /user/hadoop/data
hdfs dfs -mkdir -p /user/hadoop/output

# 查看目录内容
hdfs dfs -ls /user/hadoop

# 递归查看目录
hdfs dfs -ls -R /user

# 2. 文件上传和下载
echo "\n=== 文件上传和下载 ==="

# 上传本地文件到HDFS
hdfs dfs -put /local/path/file.txt /user/hadoop/data/

# 从本地复制文件到HDFS(与put相同)
hdfs dfs -copyFromLocal /local/path/file.txt /user/hadoop/data/

# 移动本地文件到HDFS
hdfs dfs -moveFromLocal /local/path/file.txt /user/hadoop/data/

# 从HDFS下载文件到本地
hdfs dfs -get /user/hadoop/data/file.txt /local/path/

# 从HDFS复制文件到本地(与get相同)
hdfs dfs -copyToLocal /user/hadoop/data/file.txt /local/path/

# 3. 文件查看和编辑
echo "\n=== 文件查看和编辑 ==="

# 查看文件内容
hdfs dfs -cat /user/hadoop/data/file.txt

# 查看文件头部
hdfs dfs -head /user/hadoop/data/file.txt

# 查看文件尾部
hdfs dfs -tail /user/hadoop/data/file.txt

# 查看文件的前n行
hdfs dfs -cat /user/hadoop/data/file.txt | head -n 10

# 4. 文件和目录操作
echo "\n=== 文件和目录操作 ==="

# 复制文件
hdfs dfs -cp /user/hadoop/data/file.txt /user/hadoop/backup/

# 移动文件
hdfs dfs -mv /user/hadoop/data/file.txt /user/hadoop/archive/

# 删除文件
hdfs dfs -rm /user/hadoop/data/file.txt

# 递归删除目录
hdfs dfs -rm -r /user/hadoop/temp

# 删除到回收站
hdfs dfs -rm -skipTrash /user/hadoop/data/file.txt

# 5. 文件权限管理
echo "\n=== 文件权限管理 ==="

# 修改文件权限
hdfs dfs -chmod 755 /user/hadoop/data/file.txt

# 递归修改目录权限
hdfs dfs -chmod -R 755 /user/hadoop/data

# 修改文件所有者
hdfs dfs -chown hadoop:hadoop /user/hadoop/data/file.txt

# 递归修改目录所有者
hdfs dfs -chown -R hadoop:hadoop /user/hadoop/data

# 6. 文件信息查看
echo "\n=== 文件信息查看 ==="

# 查看文件状态
hdfs dfs -stat "%n %o %r %u %g %s %y" /user/hadoop/data/file.txt

# 查看文件系统使用情况
hdfs dfs -df -h

# 查看目录大小
hdfs dfs -du -h /user/hadoop

# 查看目录总大小
hdfs dfs -du -s -h /user/hadoop

# 7. 文件完整性检查
echo "\n=== 文件完整性检查 ==="

# 检查文件校验和
hdfs dfs -checksum /user/hadoop/data/file.txt

# 验证文件完整性
hdfs fsck /user/hadoop/data/file.txt

# 检查整个目录
hdfs fsck /user/hadoop -files -blocks -locations

# 8. 配额管理
echo "\n=== 配额管理 ==="

# 设置目录文件数量配额
hdfs dfsadmin -setQuota 1000 /user/hadoop/data

# 设置目录空间配额(1GB)
hdfs dfsadmin -setSpaceQuota 1g /user/hadoop/data

# 查看配额使用情况
hdfs dfs -count -q /user/hadoop/data

# 清除配额
hdfs dfsadmin -clrQuota /user/hadoop/data
hdfs dfsadmin -clrSpaceQuota /user/hadoop/data

3.4.2 高级命令和管理操作

class HDFSCommandHelper:
    """
    HDFS命令行操作助手
    """
    
    def __init__(self):
        self.common_commands = {
            "文件操作": {
                "ls": "hdfs dfs -ls <path>",
                "mkdir": "hdfs dfs -mkdir -p <path>",
                "put": "hdfs dfs -put <local_file> <hdfs_path>",
                "get": "hdfs dfs -get <hdfs_file> <local_path>",
                "cp": "hdfs dfs -cp <src> <dst>",
                "mv": "hdfs dfs -mv <src> <dst>",
                "rm": "hdfs dfs -rm [-r] <path>",
                "cat": "hdfs dfs -cat <file>",
                "tail": "hdfs dfs -tail <file>"
            },
            "权限管理": {
                "chmod": "hdfs dfs -chmod [-R] <mode> <path>",
                "chown": "hdfs dfs -chown [-R] <owner:group> <path>",
                "stat": "hdfs dfs -stat <format> <path>"
            },
            "系统信息": {
                "df": "hdfs dfs -df [-h]",
                "du": "hdfs dfs -du [-s] [-h] <path>",
                "count": "hdfs dfs -count [-q] <path>",
                "checksum": "hdfs dfs -checksum <file>"
            },
            "管理命令": {
                "fsck": "hdfs fsck <path> [-files] [-blocks] [-locations]",
                "balancer": "hdfs balancer [-threshold <threshold>]",
                "dfsadmin": "hdfs dfsadmin -report",
                "namenode": "hdfs namenode -format"
            }
        }
    
    def get_command_help(self, category: str = None) -> dict:
        """
        获取命令帮助信息
        
        Args:
            category: 命令分类
            
        Returns:
            dict: 命令帮助信息
        """
        if category:
            return self.common_commands.get(category, {})
        return self.common_commands
    
    def generate_script(self, operations: List[str]) -> str:
        """
        生成HDFS操作脚本
        
        Args:
            operations: 操作列表
            
        Returns:
            str: 生成的脚本内容
        """
        script_lines = [
            "#!/bin/bash",
            "# HDFS操作脚本",
            "# 自动生成",
            "",
            "set -e  # 遇到错误立即退出",
            ""
        ]
        
        for i, operation in enumerate(operations, 1):
            script_lines.extend([
                f"# 操作 {i}: {operation}",
                f"echo \"执行操作 {i}: {operation}\"",
                operation,
                "if [ $? -eq 0 ]; then",
                f"    echo \"操作 {i} 执行成功\"",
                "else",
                f"    echo \"操作 {i} 执行失败\"",
                "    exit 1",
                "fi",
                ""
            ])
        
        script_lines.append("echo \"所有操作执行完成\"")
        
        return "\n".join(script_lines)
    
    def validate_path(self, path: str) -> dict:
        """
        验证HDFS路径格式
        
        Args:
            path: HDFS路径
            
        Returns:
            dict: 验证结果
        """
        result = {
            "valid": True,
            "errors": [],
            "warnings": []
        }
        
        # 检查路径格式
        if not path.startswith("/"):
            result["valid"] = False
            result["errors"].append("HDFS路径必须以 / 开头")
        
        # 检查路径长度
        if len(path) > 8000:
            result["valid"] = False
            result["errors"].append("路径长度不能超过8000字符")
        
        # 检查非法字符
        illegal_chars = ['\0', '\r', '\n']
        for char in illegal_chars:
            if char in path:
                result["valid"] = False
                result["errors"].append(f"路径包含非法字符: {repr(char)}")
        
        # 检查路径组件
        components = path.split("/")
        for component in components:
            if component == "." or component == "..":
                result["warnings"].append("路径包含相对路径组件")
            
            if len(component) > 255:
                result["valid"] = False
                result["errors"].append(f"路径组件长度不能超过255字符: {component}")
        
        return result
    
    def estimate_operation_time(self, operation: str, file_size: int = 0) -> dict:
        """
        估算操作执行时间
        
        Args:
            operation: 操作类型
            file_size: 文件大小(字节)
            
        Returns:
            dict: 时间估算结果
        """
        # 基础操作时间(秒)
        base_times = {
            "ls": 1,
            "mkdir": 2,
            "rm": 3,
            "chmod": 2,
            "chown": 2,
            "stat": 1,
            "checksum": 5
        }
        
        # 数据传输操作(基于文件大小)
        transfer_operations = ["put", "get", "cp", "mv"]
        
        if operation in base_times:
            estimated_time = base_times[operation]
        elif operation in transfer_operations:
            # 假设传输速度为100MB/s
            transfer_time = file_size / (100 * 1024 * 1024)
            estimated_time = max(transfer_time, 1)  # 最少1秒
        else:
            estimated_time = 5  # 默认5秒
        
        return {
            "operation": operation,
            "estimated_seconds": estimated_time,
            "estimated_minutes": estimated_time / 60,
            "file_size_mb": file_size / (1024 * 1024) if file_size > 0 else 0
        }

# 使用示例
if __name__ == "__main__":
    helper = HDFSCommandHelper()
    
    print("=== HDFS命令帮助 ===")
    for category, commands in helper.get_command_help().items():
        print(f"\n{category}:")
        for cmd, syntax in commands.items():
            print(f"  {cmd}: {syntax}")
    
    print("\n=== 生成操作脚本 ===")
    operations = [
        "hdfs dfs -mkdir -p /user/hadoop/data",
        "hdfs dfs -put /local/data/*.txt /user/hadoop/data/",
        "hdfs dfs -chmod 755 /user/hadoop/data",
        "hdfs dfs -ls /user/hadoop/data"
    ]
    
    script = helper.generate_script(operations)
    print(script)
    
    print("\n=== 路径验证 ===")
    test_paths = [
        "/user/hadoop/data",
        "user/hadoop/data",  # 错误:不以/开头
        "/user/hadoop/../data",  # 警告:包含相对路径
    ]
    
    for path in test_paths:
        result = helper.validate_path(path)
        print(f"路径: {path}")
        print(f"  有效: {result['valid']}")
        if result['errors']:
            print(f"  错误: {result['errors']}")
        if result['warnings']:
            print(f"  警告: {result['warnings']}")
    
    print("\n=== 操作时间估算 ===")
    operations_to_estimate = [
        ("ls", 0),
        ("put", 100 * 1024 * 1024),  # 100MB文件
        ("checksum", 50 * 1024 * 1024)  # 50MB文件
    ]
    
    for op, size in operations_to_estimate:
        estimate = helper.estimate_operation_time(op, size)
        print(f"操作: {estimate['operation']}")
        print(f"  文件大小: {estimate['file_size_mb']:.1f} MB")
         print(f"  估算时间: {estimate['estimated_seconds']:.1f} 秒")

3.5 HDFS故障恢复与高可用

3.5.1 NameNode故障恢复

NameNode是HDFS的单点故障,其故障恢复机制至关重要:

import os
import shutil
import threading
import time
from enum import Enum
from typing import Dict, List, Optional

class NameNodeState(Enum):
    """NameNode状态枚举"""
    ACTIVE = "active"
    STANDBY = "standby"
    FAILED = "failed"
    RECOVERING = "recovering"

class HANameNode:
    """
    高可用NameNode实现
    """
    
    def __init__(self, node_id: str, is_primary: bool = False):
        self.node_id = node_id
        self.state = NameNodeState.STANDBY if not is_primary else NameNodeState.ACTIVE
        self.metadata = {}
        self.edit_log = []
        self.checkpoint_interval = 60  # 检查点间隔(秒)
        self.last_checkpoint = time.time()
        self.shared_storage_path = f"/shared/namenode/{node_id}"
        self.local_storage_path = f"/local/namenode/{node_id}"
        self.is_running = False
        self.monitor_thread = None
        
    def start(self):
        """
        启动NameNode
        """
        print(f"启动NameNode {self.node_id},状态: {self.state.value}")
        self.is_running = True
        
        # 恢复元数据
        self._recover_metadata()
        
        # 启动监控线程
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        
        if self.state == NameNodeState.ACTIVE:
            self._start_active_services()
        else:
            self._start_standby_services()
    
    def stop(self):
        """
        停止NameNode
        """
        print(f"停止NameNode {self.node_id}")
        self.is_running = False
        self.state = NameNodeState.FAILED
        
        if self.monitor_thread:
            self.monitor_thread.join(timeout=5)
    
    def _recover_metadata(self):
        """
        恢复元数据
        """
        print(f"NameNode {self.node_id} 开始恢复元数据...")
        
        try:
            # 1. 加载最新的fsimage
            fsimage_path = os.path.join(self.shared_storage_path, "fsimage")
            if os.path.exists(fsimage_path):
                self._load_fsimage(fsimage_path)
                print(f"  加载fsimage成功")
            
            # 2. 重放edit log
            editlog_path = os.path.join(self.shared_storage_path, "edits")
            if os.path.exists(editlog_path):
                self._replay_edit_log(editlog_path)
                print(f"  重放edit log成功")
            
            # 3. 验证元数据一致性
            if self._validate_metadata():
                print(f"  元数据验证成功")
                self.state = NameNodeState.STANDBY
            else:
                print(f"  元数据验证失败")
                self.state = NameNodeState.FAILED
                
        except Exception as e:
            print(f"  元数据恢复失败: {e}")
            self.state = NameNodeState.FAILED
    
    def _load_fsimage(self, fsimage_path: str):
        """
        加载fsimage文件
        
        Args:
            fsimage_path: fsimage文件路径
        """
        # 模拟加载fsimage
        # 实际实现中会解析二进制格式的fsimage文件
        self.metadata = {
            "/": {"type": "directory", "children": []},
            "/user": {"type": "directory", "children": []},
            "/tmp": {"type": "directory", "children": []}
        }
    
    def _replay_edit_log(self, editlog_path: str):
        """
        重放edit log
        
        Args:
            editlog_path: edit log文件路径
        """
        # 模拟重放edit log
        # 实际实现中会解析edit log文件并重放操作
        sample_operations = [
            {"op": "create", "path": "/user/hadoop", "type": "directory"},
            {"op": "create", "path": "/user/hadoop/data", "type": "directory"},
            {"op": "create", "path": "/user/hadoop/data/file1.txt", "type": "file"}
        ]
        
        for op in sample_operations:
            self._apply_operation(op)
    
    def _apply_operation(self, operation: dict):
        """
        应用操作到元数据
        
        Args:
            operation: 操作信息
        """
        op_type = operation.get("op")
        path = operation.get("path")
        
        if op_type == "create":
            self.metadata[path] = {
                "type": operation.get("type", "file"),
                "children": [] if operation.get("type") == "directory" else None
            }
    
    def _validate_metadata(self) -> bool:
        """
        验证元数据一致性
        
        Returns:
            bool: 验证是否通过
        """
        # 简单的一致性检查
        required_paths = ["/", "/user", "/tmp"]
        
        for path in required_paths:
            if path not in self.metadata:
                print(f"    缺少必需路径: {path}")
                return False
        
        return True
    
    def _monitor_loop(self):
        """
        监控循环
        """
        while self.is_running:
            try:
                # 检查是否需要创建检查点
                if (time.time() - self.last_checkpoint) > self.checkpoint_interval:
                    if self.state == NameNodeState.ACTIVE:
                        self._create_checkpoint()
                
                # 检查其他NameNode状态
                self._check_peer_status()
                
                time.sleep(10)  # 每10秒检查一次
                
            except Exception as e:
                print(f"监控循环异常: {e}")
    
    def _create_checkpoint(self):
        """
        创建检查点
        """
        print(f"NameNode {self.node_id} 创建检查点...")
        
        try:
            # 1. 保存当前元数据为fsimage
            fsimage_path = os.path.join(self.shared_storage_path, "fsimage")
            self._save_fsimage(fsimage_path)
            
            # 2. 清理旧的edit log
            self._cleanup_edit_log()
            
            self.last_checkpoint = time.time()
            print(f"  检查点创建成功")
            
        except Exception as e:
            print(f"  检查点创建失败: {e}")
    
    def _save_fsimage(self, fsimage_path: str):
        """
        保存fsimage
        
        Args:
            fsimage_path: fsimage保存路径
        """
        # 确保目录存在
        os.makedirs(os.path.dirname(fsimage_path), exist_ok=True)
        
        # 模拟保存fsimage
        # 实际实现中会将元数据序列化为二进制格式
        with open(fsimage_path, 'w') as f:
            f.write(str(self.metadata))
    
    def _cleanup_edit_log(self):
        """
        清理edit log
        """
        # 模拟清理edit log
        self.edit_log = []
    
    def _check_peer_status(self):
        """
        检查对等NameNode状态
        """
        # 模拟检查其他NameNode状态
        # 实际实现中会通过网络检查
        pass
    
    def _start_active_services(self):
        """
        启动Active NameNode服务
        """
        print(f"  启动Active NameNode服务")
        # 启动RPC服务、Web UI等
    
    def _start_standby_services(self):
        """
        启动Standby NameNode服务
        """
        print(f"  启动Standby NameNode服务")
        # 启动edit log同步等服务
    
    def failover_to_active(self):
        """
        切换为Active状态
        """
        if self.state != NameNodeState.STANDBY:
            print(f"NameNode {self.node_id} 当前状态不是Standby,无法切换")
            return False
        
        print(f"NameNode {self.node_id} 开始切换为Active...")
        
        try:
            # 1. 验证元数据是最新的
            if not self._verify_latest_metadata():
                print(f"  元数据不是最新的,切换失败")
                return False
            
            # 2. 切换状态
            self.state = NameNodeState.ACTIVE
            
            # 3. 启动Active服务
            self._start_active_services()
            
            print(f"  切换为Active成功")
            return True
            
        except Exception as e:
            print(f"  切换为Active失败: {e}")
            self.state = NameNodeState.FAILED
            return False
    
    def _verify_latest_metadata(self) -> bool:
        """
        验证元数据是否为最新
        
        Returns:
            bool: 是否为最新
        """
        # 模拟验证元数据是否为最新
        return True

class HDFSFailoverManager:
    """
    HDFS故障转移管理器
    """
    
    def __init__(self):
        self.namenodes: Dict[str, HANameNode] = {}
        self.active_namenode: Optional[str] = None
        self.is_monitoring = False
        self.monitor_thread = None
    
    def add_namenode(self, namenode: HANameNode):
        """
        添加NameNode
        
        Args:
            namenode: NameNode实例
        """
        self.namenodes[namenode.node_id] = namenode
        
        if namenode.state == NameNodeState.ACTIVE:
            self.active_namenode = namenode.node_id
    
    def start_monitoring(self):
        """
        开始监控
        """
        print("开始HDFS故障转移监控...")
        self.is_monitoring = True
        
        self.monitor_thread = threading.Thread(target=self._monitoring_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
    
    def stop_monitoring(self):
        """
        停止监控
        """
        print("停止HDFS故障转移监控...")
        self.is_monitoring = False
        
        if self.monitor_thread:
            self.monitor_thread.join(timeout=5)
    
    def _monitoring_loop(self):
        """
        监控循环
        """
        while self.is_monitoring:
            try:
                # 检查Active NameNode状态
                if self.active_namenode:
                    active_nn = self.namenodes.get(self.active_namenode)
                    if active_nn and active_nn.state == NameNodeState.FAILED:
                        print(f"检测到Active NameNode {self.active_namenode} 故障")
                        self._perform_failover()
                
                time.sleep(5)  # 每5秒检查一次
                
            except Exception as e:
                print(f"监控循环异常: {e}")
    
    def _perform_failover(self):
        """
        执行故障转移
        """
        print("开始执行故障转移...")
        
        # 寻找可用的Standby NameNode
        standby_candidates = [
            nn for nn in self.namenodes.values()
            if nn.state == NameNodeState.STANDBY and nn.is_running
        ]
        
        if not standby_candidates:
            print("  没有可用的Standby NameNode,故障转移失败")
            return False
        
        # 选择第一个可用的Standby
        new_active = standby_candidates[0]
        
        print(f"  选择NameNode {new_active.node_id} 作为新的Active")
        
        # 执行切换
        if new_active.failover_to_active():
            self.active_namenode = new_active.node_id
            print(f"  故障转移成功,新的Active NameNode: {self.active_namenode}")
            return True
        else:
            print(f"  故障转移失败")
            return False
    
    def manual_failover(self, target_namenode_id: str) -> bool:
        """
        手动故障转移
        
        Args:
            target_namenode_id: 目标NameNode ID
            
        Returns:
            bool: 是否成功
        """
        print(f"手动故障转移到NameNode {target_namenode_id}...")
        
        target_nn = self.namenodes.get(target_namenode_id)
        if not target_nn:
            print(f"  NameNode {target_namenode_id} 不存在")
            return False
        
        if target_nn.state != NameNodeState.STANDBY:
            print(f"  NameNode {target_namenode_id} 状态不是Standby")
            return False
        
        # 停止当前Active NameNode
        if self.active_namenode:
            current_active = self.namenodes.get(self.active_namenode)
            if current_active:
                current_active.state = NameNodeState.STANDBY
                print(f"  将当前Active NameNode {self.active_namenode} 切换为Standby")
        
        # 切换目标NameNode为Active
        if target_nn.failover_to_active():
            self.active_namenode = target_namenode_id
            print(f"  手动故障转移成功")
            return True
        else:
            print(f"  手动故障转移失败")
            return False
    
    def get_cluster_status(self) -> dict:
        """
        获取集群状态
        
        Returns:
            dict: 集群状态信息
        """
        status = {
            "active_namenode": self.active_namenode,
            "namenodes": {},
            "total_namenodes": len(self.namenodes),
            "healthy_namenodes": 0
        }
        
        for nn_id, nn in self.namenodes.items():
            nn_status = {
                "state": nn.state.value,
                "is_running": nn.is_running,
                "last_checkpoint": nn.last_checkpoint
            }
            status["namenodes"][nn_id] = nn_status
            
            if nn.state in [NameNodeState.ACTIVE, NameNodeState.STANDBY]:
                status["healthy_namenodes"] += 1
        
        return status

# 使用示例
if __name__ == "__main__":
    # 创建HA NameNode集群
    nn1 = HANameNode("namenode1", is_primary=True)
    nn2 = HANameNode("namenode2")
    
    # 创建故障转移管理器
    failover_manager = HDFSFailoverManager()
    failover_manager.add_namenode(nn1)
    failover_manager.add_namenode(nn2)
    
    # 启动NameNode
    nn1.start()
    nn2.start()
    
    # 启动监控
    failover_manager.start_monitoring()
    
    print("\n=== 初始集群状态 ===")
    status = failover_manager.get_cluster_status()
    for key, value in status.items():
        if key != "namenodes":
            print(f"{key}: {value}")
    
    print("\nNameNode详细状态:")
    for nn_id, nn_status in status["namenodes"].items():
        print(f"  {nn_id}: {nn_status}")
    
    # 模拟Active NameNode故障
    print("\n=== 模拟Active NameNode故障 ===")
    nn1.stop()
    
    # 等待故障转移
    time.sleep(10)
    
    print("\n=== 故障转移后集群状态 ===")
    status = failover_manager.get_cluster_status()
    for key, value in status.items():
        if key != "namenodes":
            print(f"{key}: {value}")
    
    # 停止监控
    failover_manager.stop_monitoring()

3.5.2 DataNode故障恢复

DataNode故障恢复相对简单,主要依靠副本机制:

class DataNodeFailureDetector:
    """
    DataNode故障检测器
    """
    
    def __init__(self, namenode):
        self.namenode = namenode
        self.heartbeat_timeout = 30  # 心跳超时时间(秒)
        self.failed_nodes = set()
        self.is_monitoring = False
        self.monitor_thread = None
    
    def start_monitoring(self):
        """
        开始监控DataNode
        """
        print("开始DataNode故障检测...")
        self.is_monitoring = True
        
        self.monitor_thread = threading.Thread(target=self._monitoring_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
    
    def stop_monitoring(self):
        """
        停止监控
        """
        print("停止DataNode故障检测...")
        self.is_monitoring = False
        
        if self.monitor_thread:
            self.monitor_thread.join(timeout=5)
    
    def _monitoring_loop(self):
        """
        监控循环
        """
        while self.is_monitoring:
            try:
                current_time = time.time()
                
                # 检查每个DataNode的心跳
                for node_id, node_info in self.namenode.datanode_info.items():
                    last_heartbeat = node_info.get('last_heartbeat', 0)
                    
                    if (current_time - last_heartbeat) > self.heartbeat_timeout:
                        if node_id not in self.failed_nodes:
                            print(f"检测到DataNode {node_id} 故障")
                            self._handle_datanode_failure(node_id)
                    else:
                        if node_id in self.failed_nodes:
                            print(f"DataNode {node_id} 恢复正常")
                            self._handle_datanode_recovery(node_id)
                
                time.sleep(10)  # 每10秒检查一次
                
            except Exception as e:
                print(f"DataNode监控异常: {e}")
    
    def _handle_datanode_failure(self, node_id: str):
        """
        处理DataNode故障
        
        Args:
            node_id: 故障节点ID
        """
        self.failed_nodes.add(node_id)
        
        # 标记节点为不可用
        if node_id in self.namenode.datanode_info:
            self.namenode.datanode_info[node_id]['status'] = 'failed'
        
        # 检查受影响的数据块
        affected_blocks = self._find_affected_blocks(node_id)
        
        print(f"  受影响的数据块数量: {len(affected_blocks)}")
        
        # 触发副本修复
        for block_id in affected_blocks:
            self._schedule_block_replication(block_id, node_id)
    
    def _handle_datanode_recovery(self, node_id: str):
        """
        处理DataNode恢复
        
        Args:
            node_id: 恢复的节点ID
        """
        self.failed_nodes.discard(node_id)
        
        # 标记节点为可用
        if node_id in self.namenode.datanode_info:
            self.namenode.datanode_info[node_id]['status'] = 'active'
        
        # 重新平衡数据块
        self._rebalance_blocks_for_recovered_node(node_id)
    
    def _find_affected_blocks(self, failed_node_id: str) -> List[str]:
        """
        查找受影响的数据块
        
        Args:
            failed_node_id: 故障节点ID
            
        Returns:
            List[str]: 受影响的块ID列表
        """
        affected_blocks = []
        
        # 遍历所有数据块,找到存储在故障节点上的块
        for block_id, block_info in self.namenode.block_locations.items():
            if failed_node_id in block_info.get('locations', []):
                affected_blocks.append(block_id)
        
        return affected_blocks
    
    def _schedule_block_replication(self, block_id: str, failed_node_id: str):
        """
        调度数据块复制
        
        Args:
            block_id: 块ID
            failed_node_id: 故障节点ID
        """
        print(f"    调度块 {block_id} 的副本修复")
        
        block_info = self.namenode.block_locations.get(block_id, {})
        current_locations = block_info.get('locations', [])
        
        # 移除故障节点
        available_locations = [loc for loc in current_locations if loc != failed_node_id]
        
        # 检查副本数是否足够
        required_replicas = block_info.get('replication_factor', 3)
        current_replicas = len(available_locations)
        
        if current_replicas < required_replicas:
            needed_replicas = required_replicas - current_replicas
            print(f"      需要创建 {needed_replicas} 个新副本")
            
            # 选择目标DataNode
            target_nodes = self._select_replication_targets(needed_replicas, available_locations)
            
            for target_node in target_nodes:
                self._replicate_block(block_id, available_locations[0], target_node)
    
    def _select_replication_targets(self, needed_count: int, 
                                  exclude_nodes: List[str]) -> List[str]:
        """
        选择复制目标节点
        
        Args:
            needed_count: 需要的副本数
            exclude_nodes: 排除的节点列表
            
        Returns:
            List[str]: 目标节点列表
        """
        available_nodes = []
        
        for node_id, node_info in self.namenode.datanode_info.items():
            if (node_id not in exclude_nodes and 
                node_info.get('status') == 'active' and
                node_id not in self.failed_nodes):
                available_nodes.append(node_id)
        
        # 简单选择:按可用空间排序
        available_nodes.sort(key=lambda x: self.namenode.datanode_info[x].get('free_space', 0), 
                           reverse=True)
        
        return available_nodes[:needed_count]
    
    def _replicate_block(self, block_id: str, source_node: str, target_node: str):
        """
        复制数据块
        
        Args:
            block_id: 块ID
            source_node: 源节点
            target_node: 目标节点
        """
        print(f"      从 {source_node} 复制块 {block_id} 到 {target_node}")
        
        # 模拟复制过程
        time.sleep(1)  # 模拟复制时间
        
        # 更新块位置信息
        if block_id in self.namenode.block_locations:
            locations = self.namenode.block_locations[block_id].get('locations', [])
            if target_node not in locations:
                locations.append(target_node)
                self.namenode.block_locations[block_id]['locations'] = locations
        
        print(f"        复制完成")
    
    def _rebalance_blocks_for_recovered_node(self, recovered_node_id: str):
        """
        为恢复的节点重新平衡数据块
        
        Args:
            recovered_node_id: 恢复的节点ID
        """
        print(f"  为恢复的节点 {recovered_node_id} 重新平衡数据块")
        
        # 简化实现:这里只是打印信息
        # 实际实现中会根据集群负载情况重新分配数据块
        node_info = self.namenode.datanode_info.get(recovered_node_id, {})
        free_space = node_info.get('free_space', 0)
        
        print(f"    节点可用空间: {free_space / (1024**3):.1f} GB")
        print(f"    可以接收新的数据块副本")

# 使用示例
if __name__ == "__main__":
    # 创建NameNode(简化版本)
    class SimpleNameNode:
        def __init__(self):
            self.datanode_info = {
                "dn1": {"status": "active", "last_heartbeat": time.time(), "free_space": 1000000000},
                "dn2": {"status": "active", "last_heartbeat": time.time(), "free_space": 1000000000},
                "dn3": {"status": "active", "last_heartbeat": time.time(), "free_space": 1000000000}
            }
            self.block_locations = {
                "block_001": {"locations": ["dn1", "dn2", "dn3"], "replication_factor": 3},
                "block_002": {"locations": ["dn1", "dn2"], "replication_factor": 3},
                "block_003": {"locations": ["dn2", "dn3"], "replication_factor": 3}
            }
    
    namenode = SimpleNameNode()
    
    # 创建故障检测器
    detector = DataNodeFailureDetector(namenode)
    detector.start_monitoring()
    
    print("=== 初始状态 ===")
    for node_id, info in namenode.datanode_info.items():
        print(f"{node_id}: {info['status']}")
    
    # 模拟DataNode故障
    print("\n=== 模拟DataNode dn1故障 ===")
    namenode.datanode_info["dn1"]["last_heartbeat"] = time.time() - 60  # 60秒前
    
    # 等待故障检测
    time.sleep(15)
    
    print("\n=== 故障处理后状态 ===")
    for node_id, info in namenode.datanode_info.items():
        print(f"{node_id}: {info['status']}")
    
    print("\n数据块位置信息:")
    for block_id, block_info in namenode.block_locations.items():
        print(f"{block_id}: {block_info['locations']}")
    
    # 模拟DataNode恢复
    print("\n=== 模拟DataNode dn1恢复 ===")
    namenode.datanode_info["dn1"]["last_heartbeat"] = time.time()
    
    # 等待恢复检测
    time.sleep(15)
    
    print("\n=== 恢复后状态 ===")
    for node_id, info in namenode.datanode_info.items():
        print(f"{node_id}: {info['status']}")
    
    detector.stop_monitoring()

3.6 HDFS性能优化

3.6.1 性能监控与调优

import psutil
import threading
import time
from collections import defaultdict, deque
from typing import Dict, List, Tuple

class HDFSPerformanceMonitor:
    """
    HDFS性能监控器
    """
    
    def __init__(self, monitoring_interval: int = 10):
        self.monitoring_interval = monitoring_interval
        self.metrics_history = defaultdict(lambda: deque(maxlen=100))
        self.is_monitoring = False
        self.monitor_thread = None
        self.alert_thresholds = {
            'cpu_usage': 80.0,
            'memory_usage': 85.0,
            'disk_usage': 90.0,
            'network_io': 1000000000,  # 1GB/s
            'hdfs_operations_per_sec': 1000
        }
    
    def start_monitoring(self):
        """
        开始性能监控
        """
        print("开始HDFS性能监控...")
        self.is_monitoring = True
        
        self.monitor_thread = threading.Thread(target=self._monitoring_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
    
    def stop_monitoring(self):
        """
        停止性能监控
        """
        print("停止HDFS性能监控...")
        self.is_monitoring = False
        
        if self.monitor_thread:
            self.monitor_thread.join(timeout=5)
    
    def _monitoring_loop(self):
        """
        监控循环
        """
        while self.is_monitoring:
            try:
                # 收集系统指标
                metrics = self._collect_system_metrics()
                
                # 收集HDFS指标
                hdfs_metrics = self._collect_hdfs_metrics()
                metrics.update(hdfs_metrics)
                
                # 存储指标
                timestamp = time.time()
                for metric_name, value in metrics.items():
                    self.metrics_history[metric_name].append((timestamp, value))
                
                # 检查告警
                self._check_alerts(metrics)
                
                time.sleep(self.monitoring_interval)
                
            except Exception as e:
                print(f"性能监控异常: {e}")
    
    def _collect_system_metrics(self) -> Dict[str, float]:
        """
        收集系统指标
        
        Returns:
            Dict[str, float]: 系统指标
        """
        metrics = {}
        
        # CPU使用率
        metrics['cpu_usage'] = psutil.cpu_percent(interval=1)
        
        # 内存使用率
        memory = psutil.virtual_memory()
        metrics['memory_usage'] = memory.percent
        metrics['memory_available'] = memory.available
        
        # 磁盘使用率
        disk = psutil.disk_usage('/')
        metrics['disk_usage'] = (disk.used / disk.total) * 100
        metrics['disk_free'] = disk.free
        
        # 网络I/O
        network = psutil.net_io_counters()
        metrics['network_bytes_sent'] = network.bytes_sent
        metrics['network_bytes_recv'] = network.bytes_recv
        
        # 磁盘I/O
        disk_io = psutil.disk_io_counters()
        if disk_io:
            metrics['disk_read_bytes'] = disk_io.read_bytes
            metrics['disk_write_bytes'] = disk_io.write_bytes
            metrics['disk_read_count'] = disk_io.read_count
            metrics['disk_write_count'] = disk_io.write_count
        
        return metrics
    
    def _collect_hdfs_metrics(self) -> Dict[str, float]:
        """
        收集HDFS指标
        
        Returns:
            Dict[str, float]: HDFS指标
        """
        # 模拟HDFS指标收集
        # 实际实现中会从NameNode和DataNode的JMX接口获取
        
        metrics = {
            'hdfs_operations_per_sec': self._get_hdfs_ops_rate(),
            'hdfs_blocks_total': self._get_total_blocks(),
            'hdfs_blocks_corrupt': self._get_corrupt_blocks(),
            'hdfs_blocks_missing': self._get_missing_blocks(),
            'hdfs_capacity_used': self._get_capacity_used(),
            'hdfs_capacity_remaining': self._get_capacity_remaining(),
            'hdfs_files_total': self._get_total_files(),
            'hdfs_datanode_count': self._get_datanode_count(),
            'hdfs_namenode_heap_used': self._get_namenode_heap_usage()
        }
        
        return metrics
    
    def _get_hdfs_ops_rate(self) -> float:
        """获取HDFS操作速率"""
        # 模拟数据
        import random
        return random.uniform(100, 500)
    
    def _get_total_blocks(self) -> int:
        """获取总块数"""
        return 1000000
    
    def _get_corrupt_blocks(self) -> int:
        """获取损坏块数"""
        import random
        return random.randint(0, 5)
    
    def _get_missing_blocks(self) -> int:
        """获取丢失块数"""
        import random
        return random.randint(0, 2)
    
    def _get_capacity_used(self) -> float:
        """获取已使用容量(字节)"""
        return 5000000000000  # 5TB
    
    def _get_capacity_remaining(self) -> float:
        """获取剩余容量(字节)"""
        return 3000000000000  # 3TB
    
    def _get_total_files(self) -> int:
        """获取总文件数"""
        return 500000
    
    def _get_datanode_count(self) -> int:
        """获取DataNode数量"""
        return 10
    
    def _get_namenode_heap_usage(self) -> float:
        """获取NameNode堆内存使用率"""
        import random
        return random.uniform(60, 85)
    
    def _check_alerts(self, metrics: Dict[str, float]):
        """
        检查告警
        
        Args:
            metrics: 当前指标
        """
        for metric_name, threshold in self.alert_thresholds.items():
            if metric_name in metrics:
                value = metrics[metric_name]
                if value > threshold:
                    self._trigger_alert(metric_name, value, threshold)
    
    def _trigger_alert(self, metric_name: str, current_value: float, threshold: float):
        """
        触发告警
        
        Args:
            metric_name: 指标名称
            current_value: 当前值
            threshold: 阈值
        """
        print(f"🚨 告警: {metric_name} = {current_value:.2f}, 超过阈值 {threshold}")
        
        # 这里可以添加告警通知逻辑,如发送邮件、短信等
    
    def get_performance_report(self) -> dict:
        """
        生成性能报告
        
        Returns:
            dict: 性能报告
        """
        report = {
            'timestamp': time.time(),
            'metrics_summary': {},
            'recommendations': []
        }
        
        # 计算指标摘要
        for metric_name, history in self.metrics_history.items():
            if history:
                values = [value for _, value in history]
                report['metrics_summary'][metric_name] = {
                    'current': values[-1] if values else 0,
                    'average': sum(values) / len(values),
                    'min': min(values),
                    'max': max(values),
                    'samples': len(values)
                }
        
        # 生成优化建议
        report['recommendations'] = self._generate_recommendations(report['metrics_summary'])
        
        return report
    
    def _generate_recommendations(self, metrics_summary: dict) -> List[str]:
        """
        生成优化建议
        
        Args:
            metrics_summary: 指标摘要
            
        Returns:
            List[str]: 优化建议列表
        """
        recommendations = []
        
        # CPU使用率建议
        cpu_usage = metrics_summary.get('cpu_usage', {}).get('average', 0)
        if cpu_usage > 80:
            recommendations.append("CPU使用率过高,建议增加DataNode节点或优化MapReduce作业")
        
        # 内存使用率建议
        memory_usage = metrics_summary.get('memory_usage', {}).get('average', 0)
        if memory_usage > 85:
            recommendations.append("内存使用率过高,建议增加内存或调整JVM堆大小")
        
        # 磁盘使用率建议
        disk_usage = metrics_summary.get('disk_usage', {}).get('average', 0)
        if disk_usage > 90:
            recommendations.append("磁盘使用率过高,建议清理旧数据或增加存储容量")
        
        # HDFS块建议
        corrupt_blocks = metrics_summary.get('hdfs_blocks_corrupt', {}).get('current', 0)
        if corrupt_blocks > 0:
            recommendations.append(f"发现{corrupt_blocks}个损坏块,建议运行fsck检查并修复")
        
        missing_blocks = metrics_summary.get('hdfs_blocks_missing', {}).get('current', 0)
        if missing_blocks > 0:
            recommendations.append(f"发现{missing_blocks}个丢失块,建议检查DataNode状态")
        
        # NameNode堆内存建议
        namenode_heap = metrics_summary.get('hdfs_namenode_heap_used', {}).get('average', 0)
        if namenode_heap > 80:
            recommendations.append("NameNode堆内存使用率过高,建议增加堆大小或启用Federation")
        
        return recommendations
    
    def export_metrics(self, filename: str, format: str = 'csv'):
        """
        导出指标数据
        
        Args:
            filename: 文件名
            format: 导出格式(csv, json)
        """
        if format == 'csv':
            self._export_csv(filename)
        elif format == 'json':
            self._export_json(filename)
        else:
            raise ValueError(f"不支持的导出格式: {format}")
    
    def _export_csv(self, filename: str):
        """
        导出CSV格式
        
        Args:
            filename: 文件名
        """
        import csv
        
        with open(filename, 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            
            # 写入表头
            headers = ['timestamp'] + list(self.metrics_history.keys())
            writer.writerow(headers)
            
            # 获取所有时间戳
            all_timestamps = set()
            for history in self.metrics_history.values():
                for timestamp, _ in history:
                    all_timestamps.add(timestamp)
            
            # 按时间戳排序
            sorted_timestamps = sorted(all_timestamps)
            
            # 写入数据
            for timestamp in sorted_timestamps:
                row = [timestamp]
                for metric_name in self.metrics_history.keys():
                    # 查找该时间戳的值
                    value = None
                    for ts, val in self.metrics_history[metric_name]:
                        if ts == timestamp:
                            value = val
                            break
                    row.append(value if value is not None else '')
                writer.writerow(row)
        
        print(f"指标数据已导出到 {filename}")
    
    def _export_json(self, filename: str):
        """
        导出JSON格式
        
        Args:
            filename: 文件名
        """
        import json
        
        data = {
            'export_time': time.time(),
            'metrics': {}
        }
        
        for metric_name, history in self.metrics_history.items():
            data['metrics'][metric_name] = [
                {'timestamp': ts, 'value': val} for ts, val in history
            ]
        
        with open(filename, 'w') as jsonfile:
            json.dump(data, jsonfile, indent=2)
        
        print(f"指标数据已导出到 {filename}")

# 使用示例
if __name__ == "__main__":
    # 创建性能监控器
    monitor = HDFSPerformanceMonitor(monitoring_interval=5)
    
    # 开始监控
    monitor.start_monitoring()
    
    print("性能监控已启动,运行30秒...")
    time.sleep(30)
    
    # 生成性能报告
    print("\n=== 性能报告 ===")
    report = monitor.get_performance_report()
    
    print("指标摘要:")
    for metric_name, summary in report['metrics_summary'].items():
        print(f"  {metric_name}:")
        print(f"    当前值: {summary['current']:.2f}")
        print(f"    平均值: {summary['average']:.2f}")
        print(f"    最小值: {summary['min']:.2f}")
        print(f"    最大值: {summary['max']:.2f}")
    
    print("\n优化建议:")
    for recommendation in report['recommendations']:
        print(f"  • {recommendation}")
    
    # 导出指标数据
    monitor.export_metrics('hdfs_metrics.csv', 'csv')
    monitor.export_metrics('hdfs_metrics.json', 'json')
    
    # 停止监控
     monitor.stop_monitoring()

3.6.2 性能优化策略

class HDFSPerformanceOptimizer:
    """
    HDFS性能优化器
    """
    
    def __init__(self):
        self.optimization_strategies = {
            'block_size': self._optimize_block_size,
            'replication_factor': self._optimize_replication_factor,
            'namenode_memory': self._optimize_namenode_memory,
            'datanode_threads': self._optimize_datanode_threads,
            'network_topology': self._optimize_network_topology,
            'compression': self._optimize_compression,
            'balancer': self._optimize_balancer
        }
    
    def analyze_cluster_performance(self, cluster_info: dict) -> dict:
        """
        分析集群性能
        
        Args:
            cluster_info: 集群信息
            
        Returns:
            dict: 性能分析结果
        """
        analysis = {
            'cluster_overview': self._analyze_cluster_overview(cluster_info),
            'bottlenecks': self._identify_bottlenecks(cluster_info),
            'optimization_recommendations': []
        }
        
        # 生成优化建议
        for strategy_name, strategy_func in self.optimization_strategies.items():
            recommendations = strategy_func(cluster_info)
            if recommendations:
                analysis['optimization_recommendations'].extend(recommendations)
        
        return analysis
    
    def _analyze_cluster_overview(self, cluster_info: dict) -> dict:
        """
        分析集群概览
        
        Args:
            cluster_info: 集群信息
            
        Returns:
            dict: 集群概览分析
        """
        overview = {
            'total_capacity': cluster_info.get('total_capacity', 0),
            'used_capacity': cluster_info.get('used_capacity', 0),
            'utilization_rate': 0,
            'datanode_count': cluster_info.get('datanode_count', 0),
            'average_load': 0,
            'health_status': 'unknown'
        }
        
        # 计算利用率
        if overview['total_capacity'] > 0:
            overview['utilization_rate'] = (overview['used_capacity'] / overview['total_capacity']) * 100
        
        # 计算平均负载
        datanodes = cluster_info.get('datanodes', [])
        if datanodes:
            total_load = sum(dn.get('cpu_usage', 0) for dn in datanodes)
            overview['average_load'] = total_load / len(datanodes)
        
        # 评估健康状态
        if overview['utilization_rate'] < 80 and overview['average_load'] < 70:
            overview['health_status'] = 'healthy'
        elif overview['utilization_rate'] < 90 and overview['average_load'] < 85:
            overview['health_status'] = 'warning'
        else:
            overview['health_status'] = 'critical'
        
        return overview
    
    def _identify_bottlenecks(self, cluster_info: dict) -> List[dict]:
        """
        识别性能瓶颈
        
        Args:
            cluster_info: 集群信息
            
        Returns:
            List[dict]: 瓶颈列表
        """
        bottlenecks = []
        
        # 检查NameNode瓶颈
        namenode_info = cluster_info.get('namenode', {})
        if namenode_info.get('heap_usage', 0) > 80:
            bottlenecks.append({
                'type': 'namenode_memory',
                'severity': 'high',
                'description': 'NameNode内存使用率过高',
                'current_value': namenode_info.get('heap_usage', 0),
                'threshold': 80
            })
        
        # 检查DataNode瓶颈
        datanodes = cluster_info.get('datanodes', [])
        overloaded_nodes = [dn for dn in datanodes if dn.get('cpu_usage', 0) > 85]
        if overloaded_nodes:
            bottlenecks.append({
                'type': 'datanode_cpu',
                'severity': 'medium',
                'description': f'{len(overloaded_nodes)}个DataNode CPU使用率过高',
                'affected_nodes': [dn.get('id') for dn in overloaded_nodes]
            })
        
        # 检查网络瓶颈
        network_usage = cluster_info.get('network_usage', 0)
        if network_usage > 80:
            bottlenecks.append({
                'type': 'network',
                'severity': 'high',
                'description': '网络带宽使用率过高',
                'current_value': network_usage,
                'threshold': 80
            })
        
        # 检查磁盘瓶颈
        disk_full_nodes = [dn for dn in datanodes if dn.get('disk_usage', 0) > 90]
        if disk_full_nodes:
            bottlenecks.append({
                'type': 'disk_space',
                'severity': 'critical',
                'description': f'{len(disk_full_nodes)}个DataNode磁盘空间不足',
                'affected_nodes': [dn.get('id') for dn in disk_full_nodes]
            })
        
        return bottlenecks
    
    def _optimize_block_size(self, cluster_info: dict) -> List[str]:
        """
        优化块大小
        
        Args:
            cluster_info: 集群信息
            
        Returns:
            List[str]: 优化建议
        """
        recommendations = []
        
        current_block_size = cluster_info.get('block_size', 128 * 1024 * 1024)  # 默认128MB
        file_sizes = cluster_info.get('file_sizes', [])
        
        if file_sizes:
            avg_file_size = sum(file_sizes) / len(file_sizes)
            
            # 如果平均文件大小远小于块大小
            if avg_file_size < current_block_size * 0.1:
                recommended_size = max(64 * 1024 * 1024, int(avg_file_size * 2))  # 最小64MB
                recommendations.append(
                    f"当前块大小({current_block_size // (1024*1024)}MB)对于平均文件大小"
                    f"({avg_file_size // (1024*1024)}MB)过大,建议调整为{recommended_size // (1024*1024)}MB"
                )
            
            # 如果平均文件大小远大于块大小
            elif avg_file_size > current_block_size * 10:
                recommended_size = min(1024 * 1024 * 1024, int(avg_file_size * 0.1))  # 最大1GB
                recommendations.append(
                    f"当前块大小({current_block_size // (1024*1024)}MB)对于平均文件大小"
                    f"({avg_file_size // (1024*1024)}MB)过小,建议调整为{recommended_size // (1024*1024)}MB"
                )
        
        return recommendations
    
    def _optimize_replication_factor(self, cluster_info: dict) -> List[str]:
        """
        优化副本因子
        
        Args:
            cluster_info: 集群信息
            
        Returns:
            List[str]: 优化建议
        """
        recommendations = []
        
        datanode_count = cluster_info.get('datanode_count', 0)
        current_replication = cluster_info.get('replication_factor', 3)
        
        # 根据集群大小调整副本因子
        if datanode_count < 3 and current_replication > datanode_count:
            recommendations.append(
                f"集群只有{datanode_count}个DataNode,建议将副本因子调整为{datanode_count}"
            )
        elif datanode_count >= 10 and current_replication == 3:
            recommendations.append(
                f"大型集群({datanode_count}个DataNode)可以考虑将关键数据的副本因子调整为4或5"
            )
        
        # 检查存储利用率
        utilization = cluster_info.get('utilization_rate', 0)
        if utilization > 85 and current_replication > 2:
            recommendations.append(
                f"存储利用率过高({utilization:.1f}%),可以考虑降低非关键数据的副本因子"
            )
        
        return recommendations
    
    def _optimize_namenode_memory(self, cluster_info: dict) -> List[str]:
        """
        优化NameNode内存
        
        Args:
            cluster_info: 集群信息
            
        Returns:
            List[str]: 优化建议
        """
        recommendations = []
        
        namenode_info = cluster_info.get('namenode', {})
        heap_usage = namenode_info.get('heap_usage', 0)
        total_files = cluster_info.get('total_files', 0)
        total_blocks = cluster_info.get('total_blocks', 0)
        
        # 根据文件和块数量估算内存需求
        estimated_memory_mb = (total_files * 0.15 + total_blocks * 0.3) / 1024  # 经验公式
        current_heap_mb = namenode_info.get('heap_size_mb', 1024)
        
        if heap_usage > 80:
            recommended_heap = max(current_heap_mb * 1.5, estimated_memory_mb * 1.2)
            recommendations.append(
                f"NameNode堆内存使用率过高({heap_usage:.1f}%),建议将堆大小从"
                f"{current_heap_mb}MB增加到{recommended_heap:.0f}MB"
            )
        
        # 检查是否需要启用Federation
        if total_files > 100000000:  # 1亿个文件
            recommendations.append(
                f"文件数量过多({total_files:,}),建议考虑启用HDFS Federation来分散NameNode负载"
            )
        
        return recommendations
    
    def _optimize_datanode_threads(self, cluster_info: dict) -> List[str]:
        """
        优化DataNode线程配置
        
        Args:
            cluster_info: 集群信息
            
        Returns:
            List[str]: 优化建议
        """
        recommendations = []
        
        datanodes = cluster_info.get('datanodes', [])
        high_load_nodes = [dn for dn in datanodes if dn.get('cpu_usage', 0) > 80]
        
        if high_load_nodes:
            recommendations.append(
                f"{len(high_load_nodes)}个DataNode负载过高,建议调整以下参数:\n"
                "  - dfs.datanode.max.transfer.threads: 增加到8192\n"
                "  - dfs.datanode.handler.count: 增加到20\n"
                "  - dfs.datanode.max.xcievers: 增加到8192"
            )
        
        # 检查网络线程配置
        network_usage = cluster_info.get('network_usage', 0)
        if network_usage > 70:
            recommendations.append(
                "网络使用率较高,建议优化网络相关参数:\n"
                "  - ipc.server.handler.queue.size: 增加到1000\n"
                "  - ipc.server.read.threadpool.size: 增加到5"
            )
        
        return recommendations
    
    def _optimize_network_topology(self, cluster_info: dict) -> List[str]:
        """
        优化网络拓扑
        
        Args:
            cluster_info: 集群信息
            
        Returns:
            List[str]: 优化建议
        """
        recommendations = []
        
        datanodes = cluster_info.get('datanodes', [])
        racks = set(dn.get('rack', 'default') for dn in datanodes)
        
        # 检查机架感知配置
        if len(racks) == 1 and len(datanodes) > 3:
            recommendations.append(
                "检测到所有DataNode在同一机架,建议配置机架感知以提高容错性和性能"
            )
        
        # 检查副本分布
        if len(racks) > 1:
            recommendations.append(
                "已配置多机架环境,确保副本分布策略正确:\n"
                "  - 第一个副本:本地节点\n"
                "  - 第二个副本:不同机架的节点\n"
                "  - 第三个副本:第二个副本同机架的不同节点"
            )
        
        return recommendations
    
    def _optimize_compression(self, cluster_info: dict) -> List[str]:
        """
        优化压缩配置
        
        Args:
            cluster_info: 集群信息
            
        Returns:
            List[str]: 优化建议
        """
        recommendations = []
        
        # 检查存储利用率
        utilization = cluster_info.get('utilization_rate', 0)
        if utilization > 70:
            recommendations.append(
                "存储利用率较高,建议启用压缩以节省空间:\n"
                "  - 对于日志文件:使用LZO或Snappy压缩\n"
                "  - 对于归档数据:使用GZIP或BZIP2压缩\n"
                "  - 对于实时数据:使用Snappy压缩"
            )
        
        # 检查网络使用率
        network_usage = cluster_info.get('network_usage', 0)
        if network_usage > 60:
            recommendations.append(
                "网络使用率较高,压缩可以减少网络传输:\n"
                "  - 启用MapReduce中间结果压缩\n"
                "  - 启用RPC压缩"
            )
        
        return recommendations
    
    def _optimize_balancer(self, cluster_info: dict) -> List[str]:
        """
        优化数据平衡
        
        Args:
            cluster_info: 集群信息
            
        Returns:
            List[str]: 优化建议
        """
        recommendations = []
        
        datanodes = cluster_info.get('datanodes', [])
        if not datanodes:
            return recommendations
        
        # 计算数据分布不均衡程度
        utilizations = [dn.get('disk_usage', 0) for dn in datanodes]
        avg_utilization = sum(utilizations) / len(utilizations)
        max_deviation = max(abs(u - avg_utilization) for u in utilizations)
        
        if max_deviation > 10:  # 偏差超过10%
            recommendations.append(
                f"数据分布不均衡,最大偏差{max_deviation:.1f}%,建议运行Balancer:\n"
                "  - 设置合适的带宽限制:hdfs balancer -bandwidth 100\n"
                "  - 设置阈值:hdfs balancer -threshold 5"
            )
        
        # 检查新增节点
        new_nodes = [dn for dn in datanodes if dn.get('is_new', False)]
        if new_nodes:
            recommendations.append(
                f"检测到{len(new_nodes)}个新增DataNode,建议运行Balancer重新分布数据"
            )
        
        return recommendations
    
    def generate_optimization_script(self, recommendations: List[str]) -> str:
        """
        生成优化脚本
        
        Args:
            recommendations: 优化建议列表
            
        Returns:
            str: 优化脚本
        """
        script_lines = [
            "#!/bin/bash",
            "# HDFS性能优化脚本",
            "# 自动生成于: " + time.strftime('%Y-%m-%d %H:%M:%S'),
            "",
            "echo '开始HDFS性能优化...'",
            ""
        ]
        
        for i, recommendation in enumerate(recommendations, 1):
            script_lines.extend([
                f"echo '步骤{i}: {recommendation.split(',')[0]}'",
                f"# {recommendation}",
                "# TODO: 根据具体建议添加相应的配置命令",
                ""
            ])
        
        script_lines.extend([
            "echo 'HDFS性能优化完成'",
            "echo '请重启相关服务使配置生效'"
        ])
        
        return "\n".join(script_lines)

# 使用示例
if __name__ == "__main__":
    # 模拟集群信息
    cluster_info = {
        'total_capacity': 10 * 1024**4,  # 10TB
        'used_capacity': 8 * 1024**4,    # 8TB
        'datanode_count': 5,
        'block_size': 128 * 1024 * 1024,  # 128MB
        'replication_factor': 3,
        'total_files': 1000000,
        'total_blocks': 5000000,
        'network_usage': 75,
        'utilization_rate': 80,
        'namenode': {
            'heap_usage': 85,
            'heap_size_mb': 2048
        },
        'datanodes': [
            {'id': 'dn1', 'cpu_usage': 70, 'disk_usage': 85, 'rack': 'rack1'},
            {'id': 'dn2', 'cpu_usage': 90, 'disk_usage': 75, 'rack': 'rack1'},
            {'id': 'dn3', 'cpu_usage': 60, 'disk_usage': 80, 'rack': 'rack2'},
            {'id': 'dn4', 'cpu_usage': 85, 'disk_usage': 70, 'rack': 'rack2'},
            {'id': 'dn5', 'cpu_usage': 55, 'disk_usage': 95, 'rack': 'rack2', 'is_new': True}
        ],
        'file_sizes': [50*1024*1024, 200*1024*1024, 1024*1024*1024]  # 50MB, 200MB, 1GB
    }
    
    # 创建性能优化器
    optimizer = HDFSPerformanceOptimizer()
    
    # 分析集群性能
    analysis = optimizer.analyze_cluster_performance(cluster_info)
    
    print("=== HDFS集群性能分析 ===")
    print("\n集群概览:")
    overview = analysis['cluster_overview']
    print(f"  总容量: {overview['total_capacity'] / (1024**4):.1f} TB")
    print(f"  已用容量: {overview['used_capacity'] / (1024**4):.1f} TB")
    print(f"  利用率: {overview['utilization_rate']:.1f}%")
    print(f"  DataNode数量: {overview['datanode_count']}")
    print(f"  平均负载: {overview['average_load']:.1f}%")
    print(f"  健康状态: {overview['health_status']}")
    
    print("\n性能瓶颈:")
    for bottleneck in analysis['bottlenecks']:
        print(f"  • [{bottleneck['severity'].upper()}] {bottleneck['description']}")
        if 'current_value' in bottleneck:
            print(f"    当前值: {bottleneck['current_value']}, 阈值: {bottleneck['threshold']}")
        if 'affected_nodes' in bottleneck:
            print(f"    受影响节点: {', '.join(bottleneck['affected_nodes'])}")
    
    print("\n优化建议:")
    for i, recommendation in enumerate(analysis['optimization_recommendations'], 1):
        print(f"  {i}. {recommendation}")
    
    # 生成优化脚本
    script = optimizer.generate_optimization_script(analysis['optimization_recommendations'])
    
    print("\n=== 优化脚本 ===")
    print(script)
    
    # 保存脚本到文件
    with open('hdfs_optimization.sh', 'w') as f:
        f.write(script)
    print("\n优化脚本已保存到 hdfs_optimization.sh")

3.7 HDFS最佳实践与总结

3.7.1 HDFS最佳实践

设计原则

  1. 合理规划块大小

    • 大文件使用大块(256MB-1GB)
    • 小文件合并后存储
    • 避免大量小文件
  2. 优化副本策略

    • 根据数据重要性调整副本数
    • 配置机架感知提高容错性
    • 考虑存储成本和可靠性平衡
  3. 合理配置NameNode

    • 充足的内存配置
    • 高可用配置
    • 定期备份元数据

运维最佳实践

class HDFSBestPractices:
    """
    HDFS最佳实践指南
    """
    
    def __init__(self):
        self.practices = {
            'file_management': self._file_management_practices,
            'performance': self._performance_practices,
            'security': self._security_practices,
            'monitoring': self._monitoring_practices,
            'backup': self._backup_practices
        }
    
    def get_all_practices(self) -> dict:
        """
        获取所有最佳实践
        
        Returns:
            dict: 最佳实践指南
        """
        all_practices = {}
        
        for category, practice_func in self.practices.items():
            all_practices[category] = practice_func()
        
        return all_practices
    
    def _file_management_practices(self) -> List[dict]:
        """
        文件管理最佳实践
        
        Returns:
            List[dict]: 文件管理实践列表
        """
        return [
            {
                'title': '避免小文件问题',
                'description': '大量小文件会消耗NameNode内存',
                'recommendations': [
                    '使用SequenceFile或HAR归档小文件',
                    '设置合理的文件合并策略',
                    '定期清理临时文件和日志文件'
                ],
                'commands': [
                    'hadoop archive -archiveName files.har /input /output',
                    'hdfs dfs -find /path -name "*.tmp" -delete'
                ]
            },
            {
                'title': '优化目录结构',
                'description': '合理的目录结构提高访问效率',
                'recommendations': [
                    '按时间分区组织数据(年/月/日)',
                    '按业务类型分类存储',
                    '避免单个目录下文件过多'
                ],
                'examples': [
                    '/data/logs/2024/01/15/',
                    '/data/user_data/region=us/date=2024-01-15/'
                ]
            },
            {
                'title': '设置合适的权限',
                'description': '确保数据安全和访问控制',
                'recommendations': [
                    '使用最小权限原则',
                    '定期审查用户权限',
                    '为敏感数据设置加密'
                ],
                'commands': [
                    'hdfs dfs -chmod 750 /sensitive_data',
                    'hdfs dfs -chown user:group /user_data'
                ]
            }
        ]
    
    def _performance_practices(self) -> List[dict]:
        """
        性能优化最佳实践
        
        Returns:
            List[dict]: 性能优化实践列表
        """
        return [
            {
                'title': 'NameNode内存优化',
                'description': 'NameNode是HDFS的核心,需要充足内存',
                'recommendations': [
                    '根据文件数量估算内存需求',
                    '启用NameNode高可用',
                    '定期创建检查点'
                ],
                'formulas': [
                    '内存需求(GB) ≈ (文件数 × 150字节 + 块数 × 300字节) / 1024³',
                    '建议内存 = 估算内存 × 1.5(预留缓冲)'
                ]
            },
            {
                'title': 'DataNode优化',
                'description': '优化DataNode配置提高吞吐量',
                'recommendations': [
                    '增加并发传输线程数',
                    '优化磁盘I/O配置',
                    '配置合适的心跳间隔'
                ],
                'configurations': [
                    'dfs.datanode.max.transfer.threads=8192',
                    'dfs.datanode.handler.count=20',
                    'dfs.heartbeat.interval=3'
                ]
            },
            {
                'title': '网络优化',
                'description': '优化网络配置减少延迟',
                'recommendations': [
                    '配置机架感知',
                    '优化网络拓扑',
                    '启用数据压缩'
                ],
                'configurations': [
                    'net.topology.script.file.name=/path/to/rack-topology.sh',
                    'io.compression.codecs=org.apache.hadoop.io.compress.SnappyCodec'
                ]
            }
        ]
    
    def _security_practices(self) -> List[dict]:
        """
        安全最佳实践
        
        Returns:
            List[dict]: 安全实践列表
        """
        return [
            {
                'title': 'Kerberos认证',
                'description': '启用Kerberos提供强认证',
                'recommendations': [
                    '配置Kerberos KDC',
                    '为所有服务创建principal',
                    '定期更新keytab文件'
                ],
                'steps': [
                    '1. 安装和配置KDC',
                    '2. 创建Hadoop服务principal',
                    '3. 生成keytab文件',
                    '4. 配置Hadoop使用Kerberos'
                ]
            },
            {
                'title': '数据加密',
                'description': '保护敏感数据',
                'recommendations': [
                    '启用传输加密(TLS)',
                    '启用静态数据加密',
                    '使用透明数据加密(TDE)'
                ],
                'configurations': [
                    'hadoop.ssl.require.client.cert=true',
                    'dfs.encrypt.data.transfer=true'
                ]
            },
            {
                'title': '访问控制',
                'description': '细粒度的访问控制',
                'recommendations': [
                    '启用Ranger进行统一授权',
                    '配置ACL(访问控制列表)',
                    '实施数据分类和标签'
                ],
                'commands': [
                    'hdfs dfs -setfacl -m user:alice:rwx /data',
                    'hdfs dfs -getfacl /data'
                ]
            }
        ]
    
    def _monitoring_practices(self) -> List[dict]:
        """
        监控最佳实践
        
        Returns:
            List[dict]: 监控实践列表
        """
        return [
            {
                'title': '关键指标监控',
                'description': '监控HDFS关键性能指标',
                'metrics': [
                    'NameNode堆内存使用率',
                    'DataNode磁盘使用率',
                    'HDFS容量利用率',
                    '损坏和丢失的块数量',
                    '网络I/O和磁盘I/O'
                ],
                'tools': [
                    'Ambari/Cloudera Manager',
                    'Prometheus + Grafana',
                    'Ganglia',
                    'Nagios'
                ]
            },
            {
                'title': '日志监控',
                'description': '监控系统日志发现问题',
                'recommendations': [
                    '集中化日志收集',
                    '设置关键错误告警',
                    '定期分析日志模式'
                ],
                'log_locations': [
                    '$HADOOP_HOME/logs/hadoop-*-namenode-*.log',
                    '$HADOOP_HOME/logs/hadoop-*-datanode-*.log'
                ]
            },
            {
                'title': '健康检查',
                'description': '定期执行健康检查',
                'recommendations': [
                    '运行fsck检查文件系统',
                    '检查DataNode状态',
                    '验证副本完整性'
                ],
                'commands': [
                    'hdfs fsck / -files -blocks -locations',
                    'hdfs dfsadmin -report',
                    'hdfs dfsadmin -printTopology'
                ]
            }
        ]
    
    def _backup_practices(self) -> List[dict]:
        """
        备份最佳实践
        
        Returns:
            List[dict]: 备份实践列表
        """
        return [
            {
                'title': 'NameNode元数据备份',
                'description': 'NameNode元数据是HDFS的核心',
                'recommendations': [
                    '配置多个NameNode目录',
                    '定期备份fsimage和edits',
                    '异地备份元数据'
                ],
                'configurations': [
                    'dfs.namenode.name.dir=/data1/nn,/data2/nn',
                    'dfs.namenode.checkpoint.period=3600'
                ]
            },
            {
                'title': '数据备份策略',
                'description': '制定合适的数据备份策略',
                'recommendations': [
                    '根据数据重要性分级备份',
                    '使用DistCp进行集群间复制',
                    '定期验证备份完整性'
                ],
                'commands': [
                    'hadoop distcp /source hdfs://backup-cluster/target',
                    'hdfs dfs -checksum /path/to/file'
                ]
            },
            {
                'title': '灾难恢复',
                'description': '制定灾难恢复计划',
                'recommendations': [
                    '文档化恢复流程',
                    '定期演练恢复过程',
                    '准备备用硬件资源'
                ],
                'recovery_steps': [
                    '1. 评估损坏程度',
                    '2. 恢复NameNode元数据',
                    '3. 启动安全模式',
                    '4. 修复损坏的块',
                    '5. 退出安全模式'
                ]
            }
        ]
    
    def generate_checklist(self, category: str = None) -> str:
        """
        生成最佳实践检查清单
        
        Args:
            category: 实践类别,None表示所有类别
            
        Returns:
            str: 检查清单
        """
        checklist_lines = [
            "# HDFS最佳实践检查清单",
            f"生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}",
            ""
        ]
        
        practices = self.get_all_practices()
        
        if category and category in practices:
            practices = {category: practices[category]}
        
        for cat_name, cat_practices in practices.items():
            checklist_lines.append(f"## {cat_name.replace('_', ' ').title()}")
            checklist_lines.append("")
            
            for practice in cat_practices:
                checklist_lines.append(f"### {practice['title']}")
                checklist_lines.append(f"**说明**: {practice['description']}")
                checklist_lines.append("")
                
                if 'recommendations' in practice:
                    checklist_lines.append("**检查项目**:")
                    for rec in practice['recommendations']:
                        checklist_lines.append(f"- [ ] {rec}")
                    checklist_lines.append("")
                
                if 'commands' in practice:
                    checklist_lines.append("**相关命令**:")
                    for cmd in practice['commands']:
                        checklist_lines.append(f"```bash\n{cmd}\n```")
                    checklist_lines.append("")
                
                checklist_lines.append("---")
                checklist_lines.append("")
        
        return "\n".join(checklist_lines)

# 使用示例
if __name__ == "__main__":
    # 创建最佳实践指南
    best_practices = HDFSBestPractices()
    
    # 获取所有最佳实践
    all_practices = best_practices.get_all_practices()
    
    print("=== HDFS最佳实践指南 ===")
    
    for category, practices in all_practices.items():
        print(f"\n{category.replace('_', ' ').title()}:")
        
        for practice in practices:
            print(f"\n  • {practice['title']}")
            print(f"    {practice['description']}")
            
            if 'recommendations' in practice:
                print("    建议:")
                for rec in practice['recommendations'][:2]:  # 只显示前2个建议
                    print(f"      - {rec}")
    
    # 生成检查清单
    print("\n=== 生成检查清单 ===")
    checklist = best_practices.generate_checklist('performance')
    
    # 保存检查清单
    with open('hdfs_best_practices_checklist.md', 'w', encoding='utf-8') as f:
        f.write(checklist)
    
    print("性能优化检查清单已保存到 hdfs_best_practices_checklist.md")
    
    # 生成完整检查清单
    full_checklist = best_practices.generate_checklist()
    with open('hdfs_full_checklist.md', 'w', encoding='utf-8') as f:
        f.write(full_checklist)
    
    print("完整最佳实践检查清单已保存到 hdfs_full_checklist.md")

3.7.2 总结

HDFS作为Hadoop生态系统的核心组件,为大数据存储提供了可靠、可扩展的解决方案。通过本章的学习,我们深入了解了:

核心概念

  • 分布式架构: 主从架构设计,NameNode管理元数据,DataNode存储实际数据
  • 数据可靠性: 通过数据副本和故障检测机制确保数据安全
  • 可扩展性: 支持水平扩展,可以轻松添加存储节点

关键特性

  • 大文件支持: 优化大文件存储和访问
  • 流式访问: 支持高吞吐量的数据访问模式
  • 容错机制: 自动检测和恢复节点故障
  • 跨平台: 支持多种操作系统和硬件平台

最佳实践要点

  1. 合理规划: 根据数据特征选择合适的块大小和副本策略
  2. 性能优化: 监控关键指标,及时调整配置参数
  3. 安全加固: 启用认证、授权和加密机制
  4. 运维管理: 建立完善的监控、备份和恢复流程

适用场景

  • 大数据分析: 存储和处理TB/PB级别的数据
  • 日志存储: 集中存储应用和系统日志
  • 数据归档: 长期保存历史数据
  • 内容分发: 支持大规模内容分发需求

HDFS为大数据处理奠定了坚实的存储基础,是构建现代数据平台不可或缺的组件。掌握HDFS的原理和最佳实践,对于大数据工程师来说至关重要。