2.1 环境准备

2.1.1 系统要求

硬件要求

  • CPU:64位处理器,建议4核以上
  • 内存:最小8GB,推荐16GB以上
  • 存储:最小100GB可用空间,推荐SSD
  • 网络:千兆以太网

软件要求

  • 操作系统:Linux(CentOS 7+、Ubuntu 18.04+、RHEL 7+)
  • Java:Oracle JDK 8 或 OpenJDK 811
  • SSH:用于集群节点间通信
  • Python:2.7+ 或 3.6+(可选,用于某些工具)

2.1.2 集群规划

单机模式(Standalone Mode)

# 单机模式特点
- 所有进程运行在单个JVM中
- 主要用于开发和测试
- 不使用HDFS,直接使用本地文件系统
- 配置简单,启动快速

伪分布式模式(Pseudo-Distributed Mode)

# 伪分布式模式特点
- 所有守护进程运行在单台机器上
- 使用HDFS和YARN
- 模拟分布式环境
- 适合学习和小规模测试

完全分布式模式(Fully Distributed Mode)

# 完全分布式模式特点
- 守护进程分布在多台机器上
- 真正的分布式环境
- 生产环境推荐模式
- 高可用和容错能力

2.1.3 节点角色规划

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Hadoop集群节点规划工具
"""

from typing import Dict, List, Any
from dataclasses import dataclass
from enum import Enum

class NodeType(Enum):
    """节点类型"""
    MASTER = "master"
    WORKER = "worker"
    EDGE = "edge"
    GATEWAY = "gateway"

class ServiceType(Enum):
    """服务类型"""
    NAMENODE = "namenode"
    SECONDARY_NAMENODE = "secondary_namenode"
    DATANODE = "datanode"
    RESOURCE_MANAGER = "resource_manager"
    NODE_MANAGER = "node_manager"
    HISTORY_SERVER = "history_server"
    ZOOKEEPER = "zookeeper"
    HIVE_METASTORE = "hive_metastore"
    SPARK_MASTER = "spark_master"
    SPARK_WORKER = "spark_worker"

@dataclass
class NodeSpec:
    """节点规格"""
    hostname: str
    ip_address: str
    cpu_cores: int
    memory_gb: int
    disk_gb: int
    node_type: NodeType
    services: List[ServiceType]
    
class ClusterPlanner:
    """集群规划器"""
    
    def __init__(self):
        self.nodes: List[NodeSpec] = []
        self.service_requirements = {
            ServiceType.NAMENODE: {'min_memory': 4, 'min_cores': 2, 'ha_support': True},
            ServiceType.DATANODE: {'min_memory': 2, 'min_cores': 1, 'ha_support': False},
            ServiceType.RESOURCE_MANAGER: {'min_memory': 4, 'min_cores': 2, 'ha_support': True},
            ServiceType.NODE_MANAGER: {'min_memory': 2, 'min_cores': 1, 'ha_support': False},
            ServiceType.SECONDARY_NAMENODE: {'min_memory': 2, 'min_cores': 1, 'ha_support': False},
            ServiceType.HISTORY_SERVER: {'min_memory': 1, 'min_cores': 1, 'ha_support': False},
            ServiceType.ZOOKEEPER: {'min_memory': 1, 'min_cores': 1, 'ha_support': True}
        }
    
    def add_node(self, hostname: str, ip_address: str, cpu_cores: int, 
                 memory_gb: int, disk_gb: int, node_type: NodeType) -> bool:
        """
        添加节点
        
        Args:
            hostname: 主机名
            ip_address: IP地址
            cpu_cores: CPU核心数
            memory_gb: 内存大小(GB)
            disk_gb: 磁盘大小(GB)
            node_type: 节点类型
            
        Returns:
            bool: 添加是否成功
        """
        try:
            node = NodeSpec(
                hostname=hostname,
                ip_address=ip_address,
                cpu_cores=cpu_cores,
                memory_gb=memory_gb,
                disk_gb=disk_gb,
                node_type=node_type,
                services=[]
            )
            self.nodes.append(node)
            print(f"节点添加成功: {hostname} ({ip_address})")
            return True
        except Exception as e:
            print(f"节点添加失败: {e}")
            return False
    
    def plan_services(self, enable_ha: bool = False) -> Dict[str, Any]:
        """
        规划服务分布
        
        Args:
            enable_ha: 是否启用高可用
            
        Returns:
            Dict: 服务分布规划
        """
        if not self.nodes:
            return {'error': '没有可用节点'}
        
        # 按节点类型分组
        master_nodes = [n for n in self.nodes if n.node_type == NodeType.MASTER]
        worker_nodes = [n for n in self.nodes if n.node_type == NodeType.WORKER]
        
        if not master_nodes:
            return {'error': '没有Master节点'}
        
        if not worker_nodes:
            return {'error': '没有Worker节点'}
        
        # 服务分布规划
        service_plan = {
            'master_services': {},
            'worker_services': {},
            'recommendations': [],
            'warnings': []
        }
        
        # Master节点服务分布
        primary_master = master_nodes[0]
        primary_master.services.extend([
            ServiceType.NAMENODE,
            ServiceType.RESOURCE_MANAGER,
            ServiceType.HISTORY_SERVER
        ])
        
        service_plan['master_services'][primary_master.hostname] = [
            s.value for s in primary_master.services
        ]
        
        # 高可用配置
        if enable_ha and len(master_nodes) >= 2:
            secondary_master = master_nodes[1]
            secondary_master.services.extend([
                ServiceType.NAMENODE,  # Standby NameNode
                ServiceType.RESOURCE_MANAGER  # Standby ResourceManager
            ])
            service_plan['master_services'][secondary_master.hostname] = [
                s.value for s in secondary_master.services
            ]
            
            # ZooKeeper集群(奇数个节点)
            zk_nodes = master_nodes[:3] if len(master_nodes) >= 3 else master_nodes
            for node in zk_nodes:
                if ServiceType.ZOOKEEPER not in node.services:
                    node.services.append(ServiceType.ZOOKEEPER)
                    if node.hostname not in service_plan['master_services']:
                        service_plan['master_services'][node.hostname] = []
                    service_plan['master_services'][node.hostname].append(ServiceType.ZOOKEEPER.value)
        else:
            # 非HA模式,添加Secondary NameNode
            if len(master_nodes) >= 2:
                secondary_master = master_nodes[1]
                secondary_master.services.append(ServiceType.SECONDARY_NAMENODE)
                service_plan['master_services'][secondary_master.hostname] = [
                    ServiceType.SECONDARY_NAMENODE.value
                ]
            else:
                primary_master.services.append(ServiceType.SECONDARY_NAMENODE)
        
        # Worker节点服务分布
        for worker in worker_nodes:
            worker.services.extend([
                ServiceType.DATANODE,
                ServiceType.NODE_MANAGER
            ])
            service_plan['worker_services'][worker.hostname] = [
                s.value for s in worker.services
            ]
        
        # 生成建议和警告
        self._generate_recommendations(service_plan, enable_ha)
        
        return service_plan
    
    def _generate_recommendations(self, service_plan: Dict[str, Any], enable_ha: bool):
        """
        生成建议和警告
        
        Args:
            service_plan: 服务规划
            enable_ha: 是否启用高可用
        """
        recommendations = []
        warnings = []
        
        # 检查节点数量
        master_count = len([n for n in self.nodes if n.node_type == NodeType.MASTER])
        worker_count = len([n for n in self.nodes if n.node_type == NodeType.WORKER])
        
        if master_count < 2 and enable_ha:
            warnings.append("高可用模式建议至少2个Master节点")
        
        if worker_count < 3:
            warnings.append("建议至少3个Worker节点以保证数据可靠性")
        
        # 检查资源配置
        for node in self.nodes:
            if node.node_type == NodeType.MASTER:
                if node.memory_gb < 8:
                    warnings.append(f"Master节点 {node.hostname} 内存不足,建议至少8GB")
                if node.cpu_cores < 4:
                    warnings.append(f"Master节点 {node.hostname} CPU核心数不足,建议至少4核")
            
            if node.node_type == NodeType.WORKER:
                if node.memory_gb < 4:
                    warnings.append(f"Worker节点 {node.hostname} 内存不足,建议至少4GB")
                if node.disk_gb < 100:
                    warnings.append(f"Worker节点 {node.hostname} 磁盘空间不足,建议至少100GB")
        
        # 生成建议
        recommendations.extend([
            "配置SSH免密登录",
            "同步所有节点时间",
            "配置防火墙规则",
            "设置合适的JVM堆大小",
            "配置数据目录权限"
        ])
        
        if enable_ha:
            recommendations.extend([
                "配置ZooKeeper集群",
                "设置共享存储(如NFS)用于NameNode元数据",
                "配置自动故障转移"
            ])
        
        service_plan['recommendations'] = recommendations
        service_plan['warnings'] = warnings
    
    def generate_hosts_file(self) -> str:
        """
        生成hosts文件内容
        
        Returns:
            str: hosts文件内容
        """
        hosts_content = "# Hadoop Cluster Hosts\n"
        for node in self.nodes:
            hosts_content += f"{node.ip_address}\t{node.hostname}\n"
        
        return hosts_content
    
    def generate_inventory(self) -> Dict[str, List[str]]:
        """
        生成Ansible inventory
        
        Returns:
            Dict: Ansible inventory配置
        """
        inventory = {
            'masters': [],
            'workers': [],
            'all': []
        }
        
        for node in self.nodes:
            inventory['all'].append(node.hostname)
            if node.node_type == NodeType.MASTER:
                inventory['masters'].append(node.hostname)
            elif node.node_type == NodeType.WORKER:
                inventory['workers'].append(node.hostname)
        
        return inventory
    
    def validate_cluster(self) -> Dict[str, Any]:
        """
        验证集群配置
        
        Returns:
            Dict: 验证结果
        """
        validation_result = {
            'valid': True,
            'errors': [],
            'warnings': [],
            'summary': {}
        }
        
        # 检查基本要求
        if len(self.nodes) == 0:
            validation_result['valid'] = False
            validation_result['errors'].append("集群中没有节点")
            return validation_result
        
        master_nodes = [n for n in self.nodes if n.node_type == NodeType.MASTER]
        worker_nodes = [n for n in self.nodes if n.node_type == NodeType.WORKER]
        
        if len(master_nodes) == 0:
            validation_result['valid'] = False
            validation_result['errors'].append("集群中没有Master节点")
        
        if len(worker_nodes) == 0:
            validation_result['valid'] = False
            validation_result['errors'].append("集群中没有Worker节点")
        
        # 检查IP地址唯一性
        ip_addresses = [node.ip_address for node in self.nodes]
        if len(ip_addresses) != len(set(ip_addresses)):
            validation_result['valid'] = False
            validation_result['errors'].append("存在重复的IP地址")
        
        # 检查主机名唯一性
        hostnames = [node.hostname for node in self.nodes]
        if len(hostnames) != len(set(hostnames)):
            validation_result['valid'] = False
            validation_result['errors'].append("存在重复的主机名")
        
        # 生成摘要
        validation_result['summary'] = {
            'total_nodes': len(self.nodes),
            'master_nodes': len(master_nodes),
            'worker_nodes': len(worker_nodes),
            'total_memory': sum(node.memory_gb for node in self.nodes),
            'total_cores': sum(node.cpu_cores for node in self.nodes),
            'total_storage': sum(node.disk_gb for node in self.nodes)
        }
        
        return validation_result

# 使用示例
if __name__ == "__main__":
    # 创建集群规划器
    planner = ClusterPlanner()
    
    # 添加节点
    planner.add_node("master1", "192.168.1.10", 8, 16, 500, NodeType.MASTER)
    planner.add_node("master2", "192.168.1.11", 8, 16, 500, NodeType.MASTER)
    planner.add_node("worker1", "192.168.1.20", 4, 8, 1000, NodeType.WORKER)
    planner.add_node("worker2", "192.168.1.21", 4, 8, 1000, NodeType.WORKER)
    planner.add_node("worker3", "192.168.1.22", 4, 8, 1000, NodeType.WORKER)
    
    # 验证集群配置
    validation = planner.validate_cluster()
    print("=== 集群验证结果 ===")
    print(f"配置有效: {validation['valid']}")
    if validation['errors']:
        print("错误:")
        for error in validation['errors']:
            print(f"  - {error}")
    
    print(f"\n集群摘要:")
    summary = validation['summary']
    print(f"  总节点数: {summary['total_nodes']}")
    print(f"  Master节点: {summary['master_nodes']}")
    print(f"  Worker节点: {summary['worker_nodes']}")
    print(f"  总内存: {summary['total_memory']}GB")
    print(f"  总CPU核心: {summary['total_cores']}")
    print(f"  总存储: {summary['total_storage']}GB")
    
    # 规划服务分布
    service_plan = planner.plan_services(enable_ha=True)
    print("\n=== 服务分布规划 ===")
    print("Master节点服务:")
    for hostname, services in service_plan['master_services'].items():
        print(f"  {hostname}: {', '.join(services)}")
    
    print("\nWorker节点服务:")
    for hostname, services in service_plan['worker_services'].items():
        print(f"  {hostname}: {', '.join(services)}")
    
    if service_plan['warnings']:
        print("\n警告:")
        for warning in service_plan['warnings']:
            print(f"  - {warning}")
    
    print("\n建议:")
    for recommendation in service_plan['recommendations']:
        print(f"  - {recommendation}")
    
    # 生成hosts文件
    hosts_content = planner.generate_hosts_file()
    print("\n=== Hosts文件内容 ===")
    print(hosts_content)

2.2 Java环境配置

2.2.1 Java安装

方式一:使用包管理器安装OpenJDK

#!/bin/bash
# CentOS/RHEL系统
sudo yum update -y
sudo yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel

# Ubuntu/Debian系统
sudo apt update
sudo apt install -y openjdk-8-jdk

# 验证安装
java -version
javac -version

方式二:手动安装Oracle JDK

#!/bin/bash
# 下载Oracle JDK 8(需要Oracle账号)
# 或者使用已下载的JDK包

# 创建Java安装目录
sudo mkdir -p /opt/java

# 解压JDK
sudo tar -xzf jdk-8u291-linux-x64.tar.gz -C /opt/java/

# 创建符号链接
sudo ln -s /opt/java/jdk1.8.0_291 /opt/java/current

# 设置环境变量
echo 'export JAVA_HOME=/opt/java/current' | sudo tee -a /etc/profile
echo 'export PATH=$JAVA_HOME/bin:$PATH' | sudo tee -a /etc/profile

# 重新加载环境变量
source /etc/profile

# 验证安装
java -version

2.2.2 环境变量配置

#!/bin/bash
# 创建Java环境配置脚本

cat > /tmp/setup_java_env.sh << 'EOF'
#!/bin/bash

# 检测Java安装路径
detect_java_home() {
    if [ -n "$JAVA_HOME" ] && [ -x "$JAVA_HOME/bin/java" ]; then
        echo "JAVA_HOME已设置: $JAVA_HOME"
        return 0
    fi
    
    # 常见的Java安装路径
    JAVA_PATHS=(
        "/usr/lib/jvm/java-8-openjdk-amd64"
        "/usr/lib/jvm/java-1.8.0-openjdk"
        "/opt/java/current"
        "/usr/java/default"
        "/usr/java/latest"
    )
    
    for path in "${JAVA_PATHS[@]}"; do
        if [ -x "$path/bin/java" ]; then
            export JAVA_HOME="$path"
            echo "检测到Java安装路径: $JAVA_HOME"
            return 0
        fi
    done
    
    echo "错误: 未找到Java安装路径"
    return 1
}

# 配置环境变量
setup_java_env() {
    if ! detect_java_home; then
        exit 1
    fi
    
    # 创建环境变量配置文件
    cat > /tmp/java_env.sh << EOL
# Java Environment Variables
export JAVA_HOME=$JAVA_HOME
export JRE_HOME=\$JAVA_HOME/jre
export CLASSPATH=.:\$JAVA_HOME/lib:\$JRE_HOME/lib
export PATH=\$JAVA_HOME/bin:\$PATH
EOL
    
    # 添加到系统环境变量
    sudo cp /tmp/java_env.sh /etc/profile.d/
    sudo chmod +x /etc/profile.d/java_env.sh
    
    # 添加到用户环境变量
    if ! grep -q "JAVA_HOME" ~/.bashrc; then
        cat /tmp/java_env.sh >> ~/.bashrc
    fi
    
    # 重新加载环境变量
    source /etc/profile.d/java_env.sh
    source ~/.bashrc
    
    echo "Java环境变量配置完成"
}

# 验证Java环境
verify_java() {
    echo "=== Java环境验证 ==="
    echo "JAVA_HOME: $JAVA_HOME"
    echo "Java版本:"
    java -version
    echo "Javac版本:"
    javac -version
    
    # 检查Java版本是否为1.8
    JAVA_VERSION=$(java -version 2>&1 | grep "version" | awk '{print $3}' | sed 's/"//g')
    if [[ $JAVA_VERSION == 1.8* ]]; then
        echo "✓ Java版本符合要求 (1.8.x)"
    else
        echo "⚠ 警告: Java版本不是1.8.x,可能存在兼容性问题"
    fi
}

# 主函数
main() {
    echo "开始配置Java环境..."
    setup_java_env
    verify_java
    echo "Java环境配置完成!"
}

main
EOF

# 执行Java环境配置
chmod +x /tmp/setup_java_env.sh
/tmp/setup_java_env.sh

2.3 SSH免密登录配置

2.3.1 SSH密钥生成和分发

#!/bin/bash
# SSH免密登录配置脚本

# 配置参数
USER="hadoop"  # Hadoop用户
NODES=("master1" "master2" "worker1" "worker2" "worker3")  # 集群节点

# 生成SSH密钥对
generate_ssh_keys() {
    echo "生成SSH密钥对..."
    
    # 检查是否已存在密钥
    if [ -f ~/.ssh/id_rsa ]; then
        echo "SSH密钥已存在,跳过生成"
        return 0
    fi
    
    # 生成密钥对(无密码)
    ssh-keygen -t rsa -b 4096 -f ~/.ssh/id_rsa -N ""
    
    if [ $? -eq 0 ]; then
        echo "✓ SSH密钥生成成功"
    else
        echo "✗ SSH密钥生成失败"
        return 1
    fi
}

# 分发公钥到所有节点
distribute_public_key() {
    echo "分发公钥到集群节点..."
    
    for node in "${NODES[@]}"; do
        echo "正在配置节点: $node"
        
        # 复制公钥到目标节点
        ssh-copy-id -i ~/.ssh/id_rsa.pub $USER@$node
        
        if [ $? -eq 0 ]; then
            echo "✓ 公钥已成功复制到 $node"
        else
            echo "✗ 公钥复制到 $node 失败"
        fi
    done
}

# 测试SSH免密登录
test_ssh_connection() {
    echo "测试SSH免密登录..."
    
    for node in "${NODES[@]}"; do
        echo "测试连接到: $node"
        
        # 测试SSH连接
        ssh -o BatchMode=yes -o ConnectTimeout=5 $USER@$node 'echo "SSH连接成功"'
        
        if [ $? -eq 0 ]; then
            echo "✓ $node SSH免密登录正常"
        else
            echo "✗ $node SSH免密登录失败"
        fi
    done
}

# 配置SSH客户端
configure_ssh_client() {
    echo "配置SSH客户端..."
    
    # 创建SSH配置文件
    cat > ~/.ssh/config << EOF
# Hadoop Cluster SSH Configuration
Host *
    StrictHostKeyChecking no
    UserKnownHostsFile /dev/null
    LogLevel ERROR
    ServerAliveInterval 60
    ServerAliveCountMax 3
    
EOF
    
    # 为每个节点添加配置
    for node in "${NODES[@]}"; do
        cat >> ~/.ssh/config << EOF
Host $node
    HostName $node
    User $USER
    IdentityFile ~/.ssh/id_rsa
    
EOF
    done
    
    # 设置权限
    chmod 600 ~/.ssh/config
    
    echo "✓ SSH客户端配置完成"
}

# 主函数
main() {
    echo "=== 开始配置SSH免密登录 ==="
    
    # 确保.ssh目录存在
    mkdir -p ~/.ssh
    chmod 700 ~/.ssh
    
    # 执行配置步骤
    generate_ssh_keys
    configure_ssh_client
    distribute_public_key
    test_ssh_connection
    
    echo "=== SSH免密登录配置完成 ==="
}

# 检查是否以正确用户运行
if [ "$(whoami)" != "$USER" ]; then
    echo "请以 $USER 用户运行此脚本"
    exit 1
fi

main

2.3.2 批量SSH操作工具

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
批量SSH操作工具
用于在Hadoop集群中批量执行命令
"""

import subprocess
import threading
import time
from typing import List, Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
import json

class SSHManager:
    """SSH管理器"""
    
    def __init__(self, nodes: List[str], user: str = "hadoop", 
                 timeout: int = 30, max_workers: int = 10):
        """
        初始化SSH管理器
        
        Args:
            nodes: 节点列表
            user: SSH用户名
            timeout: 连接超时时间
            max_workers: 最大并发数
        """
        self.nodes = nodes
        self.user = user
        self.timeout = timeout
        self.max_workers = max_workers
    
    def execute_command(self, node: str, command: str) -> Dict[str, Any]:
        """
        在单个节点执行命令
        
        Args:
            node: 目标节点
            command: 要执行的命令
            
        Returns:
            Dict: 执行结果
        """
        try:
            # 构建SSH命令
            ssh_cmd = [
                "ssh",
                "-o", "BatchMode=yes",
                "-o", "ConnectTimeout=5",
                "-o", "StrictHostKeyChecking=no",
                f"{self.user}@{node}",
                command
            ]
            
            # 执行命令
            start_time = time.time()
            result = subprocess.run(
                ssh_cmd,
                capture_output=True,
                text=True,
                timeout=self.timeout
            )
            end_time = time.time()
            
            return {
                'node': node,
                'command': command,
                'success': result.returncode == 0,
                'return_code': result.returncode,
                'stdout': result.stdout.strip(),
                'stderr': result.stderr.strip(),
                'execution_time': round(end_time - start_time, 2)
            }
            
        except subprocess.TimeoutExpired:
            return {
                'node': node,
                'command': command,
                'success': False,
                'return_code': -1,
                'stdout': '',
                'stderr': f'Command timeout after {self.timeout} seconds',
                'execution_time': self.timeout
            }
        except Exception as e:
            return {
                'node': node,
                'command': command,
                'success': False,
                'return_code': -1,
                'stdout': '',
                'stderr': str(e),
                'execution_time': 0
            }
    
    def execute_on_all(self, command: str, parallel: bool = True) -> List[Dict[str, Any]]:
        """
        在所有节点执行命令
        
        Args:
            command: 要执行的命令
            parallel: 是否并行执行
            
        Returns:
            List[Dict]: 所有节点的执行结果
        """
        results = []
        
        if parallel:
            # 并行执行
            with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
                future_to_node = {
                    executor.submit(self.execute_command, node, command): node 
                    for node in self.nodes
                }
                
                for future in as_completed(future_to_node):
                    result = future.result()
                    results.append(result)
        else:
            # 串行执行
            for node in self.nodes:
                result = self.execute_command(node, command)
                results.append(result)
        
        # 按节点名排序
        results.sort(key=lambda x: x['node'])
        return results
    
    def copy_file(self, local_path: str, remote_path: str, 
                  nodes: Optional[List[str]] = None) -> List[Dict[str, Any]]:
        """
        复制文件到节点
        
        Args:
            local_path: 本地文件路径
            remote_path: 远程文件路径
            nodes: 目标节点列表,默认为所有节点
            
        Returns:
            List[Dict]: 复制结果
        """
        target_nodes = nodes or self.nodes
        results = []
        
        def copy_to_node(node: str) -> Dict[str, Any]:
            try:
                scp_cmd = [
                    "scp",
                    "-o", "BatchMode=yes",
                    "-o", "ConnectTimeout=5",
                    "-o", "StrictHostKeyChecking=no",
                    local_path,
                    f"{self.user}@{node}:{remote_path}"
                ]
                
                start_time = time.time()
                result = subprocess.run(
                    scp_cmd,
                    capture_output=True,
                    text=True,
                    timeout=self.timeout
                )
                end_time = time.time()
                
                return {
                    'node': node,
                    'operation': 'copy_file',
                    'local_path': local_path,
                    'remote_path': remote_path,
                    'success': result.returncode == 0,
                    'stderr': result.stderr.strip(),
                    'execution_time': round(end_time - start_time, 2)
                }
                
            except Exception as e:
                return {
                    'node': node,
                    'operation': 'copy_file',
                    'local_path': local_path,
                    'remote_path': remote_path,
                    'success': False,
                    'stderr': str(e),
                    'execution_time': 0
                }
        
        # 并行复制
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            future_to_node = {
                executor.submit(copy_to_node, node): node 
                for node in target_nodes
            }
            
            for future in as_completed(future_to_node):
                result = future.result()
                results.append(result)
        
        results.sort(key=lambda x: x['node'])
        return results
    
    def check_connectivity(self) -> Dict[str, Any]:
        """
        检查所有节点的连通性
        
        Returns:
            Dict: 连通性检查结果
        """
        results = self.execute_on_all("echo 'SSH connection test'")
        
        summary = {
            'total_nodes': len(self.nodes),
            'reachable_nodes': 0,
            'unreachable_nodes': 0,
            'node_status': {},
            'unreachable_list': []
        }
        
        for result in results:
            node = result['node']
            if result['success']:
                summary['reachable_nodes'] += 1
                summary['node_status'][node] = 'reachable'
            else:
                summary['unreachable_nodes'] += 1
                summary['node_status'][node] = 'unreachable'
                summary['unreachable_list'].append(node)
        
        return summary
    
    def get_system_info(self) -> List[Dict[str, Any]]:
        """
        获取所有节点的系统信息
        
        Returns:
            List[Dict]: 系统信息
        """
        commands = {
            'hostname': 'hostname',
            'os_release': 'cat /etc/os-release | grep PRETTY_NAME | cut -d"=" -f2 | tr -d \'"\' || echo "Unknown"',
            'kernel': 'uname -r',
            'cpu_info': 'lscpu | grep "Model name" | cut -d":" -f2 | xargs || echo "Unknown"',
            'cpu_cores': 'nproc',
            'memory_total': 'free -h | grep Mem | awk "{print \$2}"',
            'memory_available': 'free -h | grep Mem | awk "{print \$7}"',
            'disk_usage': 'df -h / | tail -1 | awk "{print \$3\"/\"\$2\" (\"\$5\")\""',
            'uptime': 'uptime | awk "{print \$3\" \"\$4}" | sed "s/,//"',
            'java_version': 'java -version 2>&1 | head -1 | cut -d\'"\' -f2 || echo "Not installed"'
        }
        
        system_info = []
        
        for node in self.nodes:
            node_info = {'node': node}
            
            for key, command in commands.items():
                result = self.execute_command(node, command)
                if result['success']:
                    node_info[key] = result['stdout']
                else:
                    node_info[key] = 'Error: ' + result['stderr']
            
            system_info.append(node_info)
        
        return system_info
    
    def print_results(self, results: List[Dict[str, Any]], show_details: bool = True):
        """
        打印执行结果
        
        Args:
            results: 执行结果列表
            show_details: 是否显示详细信息
        """
        print(f"\n=== 执行结果 ({len(results)}个节点) ===")
        
        success_count = sum(1 for r in results if r['success'])
        failure_count = len(results) - success_count
        
        print(f"成功: {success_count}, 失败: {failure_count}")
        
        if show_details:
            for result in results:
                status = "✓" if result['success'] else "✗"
                print(f"\n{status} {result['node']}:")
                
                if result['success'] and result.get('stdout'):
                    print(f"  输出: {result['stdout']}")
                
                if not result['success'] and result.get('stderr'):
                    print(f"  错误: {result['stderr']}")
                
                if 'execution_time' in result:
                    print(f"  耗时: {result['execution_time']}秒")

# 使用示例
if __name__ == "__main__":
    # 定义集群节点
    nodes = ["master1", "master2", "worker1", "worker2", "worker3"]
    
    # 创建SSH管理器
    ssh_manager = SSHManager(nodes, user="hadoop")
    
    # 检查连通性
    print("=== 检查节点连通性 ===")
    connectivity = ssh_manager.check_connectivity()
    print(f"可达节点: {connectivity['reachable_nodes']}/{connectivity['total_nodes']}")
    
    if connectivity['unreachable_list']:
        print(f"不可达节点: {', '.join(connectivity['unreachable_list'])}")
    
    # 获取系统信息
    print("\n=== 获取系统信息 ===")
    system_info = ssh_manager.get_system_info()
    
    for info in system_info:
        print(f"\n节点: {info['node']}")
        print(f"  主机名: {info['hostname']}")
        print(f"  操作系统: {info['os_release']}")
        print(f"  内核版本: {info['kernel']}")
        print(f"  CPU: {info['cpu_info']} ({info['cpu_cores']} cores)")
        print(f"  内存: {info['memory_available']}/{info['memory_total']}")
        print(f"  磁盘使用: {info['disk_usage']}")
        print(f"  运行时间: {info['uptime']}")
        print(f"  Java版本: {info['java_version']}")
    
    # 执行批量命令
    print("\n=== 执行批量命令 ===")
    results = ssh_manager.execute_on_all("date")
    ssh_manager.print_results(results)

2.4 Hadoop下载与安装

2.4.1 下载Hadoop

#!/bin/bash
# Hadoop下载和安装脚本

# 配置参数
HADOOP_VERSION="3.3.4"
HADOOP_USER="hadoop"
HADOOP_HOME="/opt/hadoop"
HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"
HADOOP_DATA_DIR="/data/hadoop"
HADOOP_LOG_DIR="/var/log/hadoop"

# 下载Hadoop
download_hadoop() {
    echo "下载Hadoop $HADOOP_VERSION..."
    
    # 创建临时下载目录
    mkdir -p /tmp/hadoop-install
    cd /tmp/hadoop-install
    
    # 下载Hadoop二进制包
    HADOOP_URL="https://archive.apache.org/dist/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz"
    
    if [ ! -f "hadoop-$HADOOP_VERSION.tar.gz" ]; then
        echo "正在下载: $HADOOP_URL"
        wget "$HADOOP_URL" || {
            echo "下载失败,尝试使用镜像站点"
            # 使用清华大学镜像
            MIRROR_URL="https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz"
            wget "$MIRROR_URL" || {
                echo "下载失败,请手动下载Hadoop包"
                return 1
            }
        }
    else
        echo "Hadoop包已存在,跳过下载"
    fi
    
    # 验证下载的文件
    if [ -f "hadoop-$HADOOP_VERSION.tar.gz" ]; then
        echo "✓ Hadoop下载完成"
        return 0
    else
        echo "✗ Hadoop下载失败"
        return 1
    fi
}

# 创建Hadoop用户
create_hadoop_user() {
    echo "创建Hadoop用户..."
    
    # 检查用户是否已存在
    if id "$HADOOP_USER" &>/dev/null; then
        echo "用户 $HADOOP_USER 已存在"
        return 0
    fi
    
    # 创建用户组
    sudo groupadd hadoop 2>/dev/null || true
    
    # 创建用户
    sudo useradd -g hadoop -m -s /bin/bash "$HADOOP_USER"
    
    # 设置密码(可选)
    echo "请为用户 $HADOOP_USER 设置密码:"
    sudo passwd "$HADOOP_USER"
    
    # 添加sudo权限
    echo "$HADOOP_USER ALL=(ALL) NOPASSWD:ALL" | sudo tee /etc/sudoers.d/hadoop
    
    echo "✓ Hadoop用户创建完成"
}

# 安装Hadoop
install_hadoop() {
    echo "安装Hadoop..."
    
    # 创建安装目录
    sudo mkdir -p "$HADOOP_HOME"
    sudo mkdir -p "$HADOOP_DATA_DIR"
    sudo mkdir -p "$HADOOP_LOG_DIR"
    
    # 解压Hadoop
    echo "解压Hadoop到 $HADOOP_HOME"
    sudo tar -xzf "/tmp/hadoop-install/hadoop-$HADOOP_VERSION.tar.gz" -C /opt/
    
    # 创建符号链接
    sudo ln -sf "/opt/hadoop-$HADOOP_VERSION" "$HADOOP_HOME"
    
    # 设置目录权限
    sudo chown -R "$HADOOP_USER:hadoop" "$HADOOP_HOME"
    sudo chown -R "$HADOOP_USER:hadoop" "$HADOOP_DATA_DIR"
    sudo chown -R "$HADOOP_USER:hadoop" "$HADOOP_LOG_DIR"
    
    # 设置目录权限
    sudo chmod 755 "$HADOOP_HOME"
    sudo chmod 755 "$HADOOP_DATA_DIR"
    sudo chmod 755 "$HADOOP_LOG_DIR"
    
    echo "✓ Hadoop安装完成"
}

# 配置环境变量
setup_environment() {
    echo "配置Hadoop环境变量..."
    
    # 创建Hadoop环境变量文件
    cat > /tmp/hadoop_env.sh << EOF
# Hadoop Environment Variables
export HADOOP_HOME=$HADOOP_HOME
export HADOOP_CONF_DIR=$HADOOP_CONF_DIR
export HADOOP_DATA_DIR=$HADOOP_DATA_DIR
export HADOOP_LOG_DIR=$HADOOP_LOG_DIR
export HADOOP_CLASSPATH=\$JAVA_HOME/lib/tools.jar
export PATH=\$HADOOP_HOME/bin:\$HADOOP_HOME/sbin:\$PATH

# HDFS Environment
export HDFS_NAMENODE_USER=$HADOOP_USER
export HDFS_DATANODE_USER=$HADOOP_USER
export HDFS_SECONDARYNAMENODE_USER=$HADOOP_USER

# YARN Environment
export YARN_RESOURCEMANAGER_USER=$HADOOP_USER
export YARN_NODEMANAGER_USER=$HADOOP_USER
EOF
    
    # 添加到系统环境变量
    sudo cp /tmp/hadoop_env.sh /etc/profile.d/
    sudo chmod +x /etc/profile.d/hadoop_env.sh
    
    # 添加到Hadoop用户环境变量
    sudo -u "$HADOOP_USER" bash -c "cat /tmp/hadoop_env.sh >> /home/$HADOOP_USER/.bashrc"
    
    echo "✓ 环境变量配置完成"
}

# 验证安装
verify_installation() {
    echo "验证Hadoop安装..."
    
    # 切换到Hadoop用户执行验证
    sudo -u "$HADOOP_USER" bash -c '
        source ~/.bashrc
        
        echo "=== Hadoop安装验证 ==="
        echo "HADOOP_HOME: $HADOOP_HOME"
        echo "Hadoop版本:"
        $HADOOP_HOME/bin/hadoop version
        
        echo "\nHDFS版本:"
        $HADOOP_HOME/bin/hdfs version
        
        echo "\nYARN版本:"
        $HADOOP_HOME/bin/yarn version
        
        echo "\nMapReduce版本:"
        $HADOOP_HOME/bin/mapred version
    '
    
    if [ $? -eq 0 ]; then
        echo "✓ Hadoop安装验证成功"
    else
        echo "✗ Hadoop安装验证失败"
        return 1
    fi
}

# 清理临时文件
cleanup() {
    echo "清理临时文件..."
    rm -rf /tmp/hadoop-install
    rm -f /tmp/hadoop_env.sh
    echo "✓ 清理完成"
}

# 主函数
main() {
    echo "=== 开始安装Hadoop $HADOOP_VERSION ==="
    
    # 检查是否以root用户运行
    if [ "$EUID" -ne 0 ]; then
        echo "请以root用户运行此脚本"
        exit 1
    fi
    
    # 执行安装步骤
    download_hadoop || exit 1
    create_hadoop_user || exit 1
    install_hadoop || exit 1
    setup_environment || exit 1
    verify_installation || exit 1
    cleanup
    
    echo "=== Hadoop安装完成 ==="
    echo "请使用以下命令切换到Hadoop用户:"
    echo "su - $HADOOP_USER"
    echo "然后执行: source ~/.bashrc"
}

main

2.4.2 目录结构说明

#!/bin/bash
# Hadoop目录结构说明和创建脚本

# 显示Hadoop目录结构
show_hadoop_structure() {
    echo "=== Hadoop目录结构 ==="
    
    cat << 'EOF'
Hadoop安装目录结构:

/opt/hadoop/                    # Hadoop安装根目录
├── bin/                        # 可执行文件目录
│   ├── hadoop                  # Hadoop主命令
│   ├── hdfs                    # HDFS命令
│   ├── yarn                    # YARN命令
│   └── mapred                  # MapReduce命令
├── sbin/                       # 系统管理脚本
│   ├── start-dfs.sh           # 启动HDFS
│   ├── stop-dfs.sh            # 停止HDFS
│   ├── start-yarn.sh          # 启动YARN
│   ├── stop-yarn.sh           # 停止YARN
│   └── start-all.sh           # 启动所有服务
├── etc/hadoop/                 # 配置文件目录
│   ├── core-site.xml          # 核心配置
│   ├── hdfs-site.xml          # HDFS配置
│   ├── yarn-site.xml          # YARN配置
│   ├── mapred-site.xml        # MapReduce配置
│   ├── hadoop-env.sh          # 环境变量配置
│   ├── workers                # Worker节点列表
│   └── log4j.properties       # 日志配置
├── lib/                        # 库文件目录
├── libexec/                    # 内部脚本目录
├── logs/                       # 日志文件目录
├── share/                      # 共享文件目录
│   ├── hadoop/                # Hadoop核心JAR包
│   └── doc/                   # 文档目录
└── include/                    # 头文件目录

数据目录结构:

/data/hadoop/                   # Hadoop数据根目录
├── hdfs/                       # HDFS数据目录
│   ├── namenode/              # NameNode数据目录
│   ├── datanode/              # DataNode数据目录
│   └── secondary/             # Secondary NameNode数据目录
├── yarn/                       # YARN数据目录
│   ├── local/                 # 本地目录
│   └── logs/                  # 应用日志目录
└── tmp/                        # 临时目录

日志目录结构:

/var/log/hadoop/                # Hadoop日志根目录
├── hdfs/                       # HDFS日志
├── yarn/                       # YARN日志
├── mapreduce/                  # MapReduce日志
└── audit/                      # 审计日志
EOF
}

# 创建Hadoop数据目录
create_data_directories() {
    echo "创建Hadoop数据目录..."
    
    local hadoop_user="hadoop"
    local data_dirs=(
        "/data/hadoop"
        "/data/hadoop/hdfs"
        "/data/hadoop/hdfs/namenode"
        "/data/hadoop/hdfs/datanode"
        "/data/hadoop/hdfs/secondary"
        "/data/hadoop/yarn"
        "/data/hadoop/yarn/local"
        "/data/hadoop/yarn/logs"
        "/data/hadoop/tmp"
    )
    
    local log_dirs=(
        "/var/log/hadoop"
        "/var/log/hadoop/hdfs"
        "/var/log/hadoop/yarn"
        "/var/log/hadoop/mapreduce"
        "/var/log/hadoop/audit"
    )
    
    # 创建数据目录
    for dir in "${data_dirs[@]}"; do
        sudo mkdir -p "$dir"
        sudo chown "$hadoop_user:hadoop" "$dir"
        sudo chmod 755 "$dir"
        echo "✓ 创建目录: $dir"
    done
    
    # 创建日志目录
    for dir in "${log_dirs[@]}"; do
        sudo mkdir -p "$dir"
        sudo chown "$hadoop_user:hadoop" "$dir"
        sudo chmod 755 "$dir"
        echo "✓ 创建目录: $dir"
    done
    
    echo "✓ 数据目录创建完成"
}

# 设置目录权限
set_directory_permissions() {
    echo "设置目录权限..."
    
    local hadoop_user="hadoop"
    
    # 设置Hadoop安装目录权限
    sudo chown -R "$hadoop_user:hadoop" /opt/hadoop*
    sudo chmod -R 755 /opt/hadoop*/bin
    sudo chmod -R 755 /opt/hadoop*/sbin
    sudo chmod -R 644 /opt/hadoop*/etc/hadoop/*
    sudo chmod 755 /opt/hadoop*/etc/hadoop
    
    # 设置数据目录权限
    sudo chown -R "$hadoop_user:hadoop" /data/hadoop
    sudo chmod -R 755 /data/hadoop
    
    # 设置日志目录权限
    sudo chown -R "$hadoop_user:hadoop" /var/log/hadoop
    sudo chmod -R 755 /var/log/hadoop
    
    echo "✓ 目录权限设置完成"
}

# 创建符号链接
create_symlinks() {
    echo "创建符号链接..."
    
    # 创建命令符号链接到系统PATH
    local commands=("hadoop" "hdfs" "yarn" "mapred")
    
    for cmd in "${commands[@]}"; do
        if [ ! -L "/usr/local/bin/$cmd" ]; then
            sudo ln -s "/opt/hadoop/bin/$cmd" "/usr/local/bin/$cmd"
            echo "✓ 创建符号链接: /usr/local/bin/$cmd"
        fi
    done
    
    echo "✓ 符号链接创建完成"
}

# 验证目录结构
verify_directory_structure() {
    echo "验证目录结构..."
    
    local required_dirs=(
        "/opt/hadoop"
        "/opt/hadoop/bin"
        "/opt/hadoop/sbin"
        "/opt/hadoop/etc/hadoop"
        "/data/hadoop"
        "/data/hadoop/hdfs/namenode"
        "/data/hadoop/hdfs/datanode"
        "/var/log/hadoop"
    )
    
    local missing_dirs=()
    
    for dir in "${required_dirs[@]}"; do
        if [ ! -d "$dir" ]; then
            missing_dirs+=("$dir")
        fi
    done
    
    if [ ${#missing_dirs[@]} -eq 0 ]; then
        echo "✓ 目录结构验证通过"
        return 0
    else
        echo "✗ 以下目录缺失:"
        for dir in "${missing_dirs[@]}"; do
            echo "  - $dir"
        done
        return 1
    fi
}

# 主函数
main() {
    echo "=== Hadoop目录结构配置 ==="
    
    show_hadoop_structure
    echo
    create_data_directories
    set_directory_permissions
    create_symlinks
    verify_directory_structure
    
    echo "=== 目录结构配置完成 ==="
}

main

2.5 基础配置文件

2.5.1 核心配置文件说明

Hadoop的配置文件位于$HADOOP_HOME/etc/hadoop/目录下,主要包括:

  • core-site.xml:核心配置,定义文件系统和RPC设置
  • hdfs-site.xml:HDFS配置,定义NameNode和DataNode设置
  • yarn-site.xml:YARN配置,定义ResourceManager和NodeManager设置
  • mapred-site.xml:MapReduce配置,定义作业执行设置
  • hadoop-env.sh:环境变量配置
  • workers:Worker节点列表

2.5.2 配置文件生成工具

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Hadoop配置文件生成工具
"""

import os
import xml.etree.ElementTree as ET
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from pathlib import Path

@dataclass
class HadoopConfig:
    """Hadoop配置参数"""
    cluster_name: str = "hadoop-cluster"
    namenode_host: str = "master1"
    namenode_port: int = 9000
    namenode_http_port: int = 9870
    secondary_namenode_host: str = "master2"
    resource_manager_host: str = "master1"
    resource_manager_port: int = 8032
    resource_manager_webapp_port: int = 8088
    history_server_host: str = "master1"
    history_server_port: int = 19888
    
    # 目录配置
    hadoop_tmp_dir: str = "/data/hadoop/tmp"
    namenode_dir: str = "/data/hadoop/hdfs/namenode"
    datanode_dir: str = "/data/hadoop/hdfs/datanode"
    secondary_namenode_dir: str = "/data/hadoop/hdfs/secondary"
    yarn_local_dir: str = "/data/hadoop/yarn/local"
    yarn_log_dir: str = "/data/hadoop/yarn/logs"
    
    # 内存配置(MB)
    namenode_heap_size: int = 2048
    datanode_heap_size: int = 1024
    resource_manager_heap_size: int = 2048
    node_manager_heap_size: int = 1024
    
    # 副本数配置
    replication_factor: int = 3
    
    # Worker节点列表
    worker_nodes: List[str] = None
    
    def __post_init__(self):
        if self.worker_nodes is None:
            self.worker_nodes = ["worker1", "worker2", "worker3"]

class HadoopConfigGenerator:
    """Hadoop配置文件生成器"""
    
    def __init__(self, config: HadoopConfig):
        self.config = config
    
    def _create_xml_element(self, name: str, value: str, description: str = "") -> ET.Element:
        """
        创建XML配置元素
        
        Args:
            name: 配置项名称
            value: 配置项值
            description: 配置项描述
            
        Returns:
            ET.Element: XML元素
        """
        property_elem = ET.Element("property")
        
        name_elem = ET.SubElement(property_elem, "name")
        name_elem.text = name
        
        value_elem = ET.SubElement(property_elem, "value")
        value_elem.text = str(value)
        
        if description:
            desc_elem = ET.SubElement(property_elem, "description")
            desc_elem.text = description
        
        return property_elem
    
    def _format_xml(self, root: ET.Element) -> str:
        """
        格式化XML输出
        
        Args:
            root: XML根元素
            
        Returns:
            str: 格式化的XML字符串
        """
        # 添加XML声明和注释
        xml_str = '<?xml version="1.0" encoding="UTF-8"?>\n'
        xml_str += '<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>\n'
        xml_str += '<!--\n'
        xml_str += f'  Hadoop Configuration for {self.config.cluster_name}\n'
        xml_str += '  Generated by HadoopConfigGenerator\n'
        xml_str += '-->\n'
        
        # 转换为字符串并格式化
        rough_string = ET.tostring(root, encoding='unicode')
        
        # 简单的格式化(添加缩进)
        lines = rough_string.split('>')
        formatted_lines = []
        indent_level = 0
        
        for line in lines[:-1]:  # 最后一个元素是空的
            line = line + '>'
            
            # 处理缩进
            if line.strip().startswith('</'):
                indent_level -= 1
            
            formatted_lines.append('  ' * indent_level + line.strip())
            
            if not line.strip().startswith('</') and not line.strip().endswith('/>'):
                if '<' in line and not line.strip().startswith('<?'):
                    indent_level += 1
        
        xml_str += '\n'.join(formatted_lines)
        return xml_str
    
    def generate_core_site(self) -> str:
        """
        生成core-site.xml配置
        
        Returns:
            str: core-site.xml内容
        """
        root = ET.Element("configuration")
        
        # 核心配置项
        properties = [
            ("fs.defaultFS", f"hdfs://{self.config.namenode_host}:{self.config.namenode_port}", 
             "默认文件系统URI"),
            ("hadoop.tmp.dir", self.config.hadoop_tmp_dir, 
             "Hadoop临时目录"),
            ("hadoop.http.staticuser.user", "hadoop", 
             "Web界面静态用户"),
            ("fs.trash.interval", "1440", 
             "回收站保留时间(分钟)"),
            ("io.file.buffer.size", "131072", 
             "文件缓冲区大小"),
            ("hadoop.security.authorization", "false", 
             "是否启用安全授权"),
            ("hadoop.security.authentication", "simple", 
             "认证方式"),
        ]
        
        for name, value, desc in properties:
            root.append(self._create_xml_element(name, value, desc))
        
        return self._format_xml(root)
    
    def generate_hdfs_site(self) -> str:
        """
        生成hdfs-site.xml配置
        
        Returns:
            str: hdfs-site.xml内容
        """
        root = ET.Element("configuration")
        
        # HDFS配置项
        properties = [
            ("dfs.namenode.name.dir", self.config.namenode_dir, 
             "NameNode数据目录"),
            ("dfs.datanode.data.dir", self.config.datanode_dir, 
             "DataNode数据目录"),
            ("dfs.namenode.checkpoint.dir", self.config.secondary_namenode_dir, 
             "Secondary NameNode检查点目录"),
            ("dfs.replication", str(self.config.replication_factor), 
             "数据副本数"),
            ("dfs.blocksize", "134217728", 
             "HDFS块大小(128MB)"),
            ("dfs.namenode.handler.count", "20", 
             "NameNode处理线程数"),
            ("dfs.datanode.handler.count", "10", 
             "DataNode处理线程数"),
            ("dfs.namenode.http-address", f"{self.config.namenode_host}:{self.config.namenode_http_port}", 
             "NameNode Web界面地址"),
            ("dfs.namenode.secondary.http-address", f"{self.config.secondary_namenode_host}:9868", 
             "Secondary NameNode Web界面地址"),
            ("dfs.webhdfs.enabled", "true", 
             "启用WebHDFS"),
            ("dfs.permissions.enabled", "false", 
             "禁用权限检查(开发环境)"),
            ("dfs.namenode.safemode.threshold-pct", "0.9", 
             "安全模式阈值"),
            ("dfs.datanode.max.transfer.threads", "4096", 
             "DataNode最大传输线程数"),
        ]
        
        for name, value, desc in properties:
            root.append(self._create_xml_element(name, value, desc))
        
        return self._format_xml(root)
    
    def generate_yarn_site(self) -> str:
        """
        生成yarn-site.xml配置
        
        Returns:
            str: yarn-site.xml内容
        """
        root = ET.Element("configuration")
        
        # YARN配置项
        properties = [
            ("yarn.resourcemanager.hostname", self.config.resource_manager_host, 
             "ResourceManager主机名"),
            ("yarn.resourcemanager.address", f"{self.config.resource_manager_host}:{self.config.resource_manager_port}", 
             "ResourceManager地址"),
            ("yarn.resourcemanager.webapp.address", f"{self.config.resource_manager_host}:{self.config.resource_manager_webapp_port}", 
             "ResourceManager Web界面地址"),
            ("yarn.nodemanager.aux-services", "mapreduce_shuffle", 
             "NodeManager辅助服务"),
            ("yarn.nodemanager.aux-services.mapreduce_shuffle.class", 
             "org.apache.hadoop.mapred.ShuffleHandler", 
             "Shuffle服务类"),
            ("yarn.nodemanager.local-dirs", self.config.yarn_local_dir, 
             "NodeManager本地目录"),
            ("yarn.nodemanager.log-dirs", self.config.yarn_log_dir, 
             "NodeManager日志目录"),
            ("yarn.nodemanager.resource.memory-mb", "4096", 
             "NodeManager可用内存(MB)"),
            ("yarn.nodemanager.resource.cpu-vcores", "4", 
             "NodeManager可用CPU核心数"),
            ("yarn.scheduler.maximum-allocation-mb", "4096", 
             "单个容器最大内存(MB)"),
            ("yarn.scheduler.minimum-allocation-mb", "512", 
             "单个容器最小内存(MB)"),
            ("yarn.app.mapreduce.am.resource.mb", "1024", 
             "ApplicationMaster内存(MB)"),
            ("yarn.log-aggregation-enable", "true", 
             "启用日志聚合"),
            ("yarn.log.server.url", f"http://{self.config.history_server_host}:{self.config.history_server_port}/jobhistory/logs", 
             "日志服务器URL"),
        ]
        
        for name, value, desc in properties:
            root.append(self._create_xml_element(name, value, desc))
        
        return self._format_xml(root)
    
    def generate_mapred_site(self) -> str:
        """
        生成mapred-site.xml配置
        
        Returns:
            str: mapred-site.xml内容
        """
        root = ET.Element("configuration")
        
        # MapReduce配置项
        properties = [
            ("mapreduce.framework.name", "yarn", 
             "MapReduce框架名称"),
            ("mapreduce.jobhistory.address", f"{self.config.history_server_host}:10020", 
             "JobHistory服务地址"),
            ("mapreduce.jobhistory.webapp.address", f"{self.config.history_server_host}:{self.config.history_server_port}", 
             "JobHistory Web界面地址"),
            ("mapreduce.map.memory.mb", "1024", 
             "Map任务内存(MB)"),
            ("mapreduce.reduce.memory.mb", "2048", 
             "Reduce任务内存(MB)"),
            ("mapreduce.map.java.opts", "-Xmx819m", 
             "Map任务JVM参数"),
            ("mapreduce.reduce.java.opts", "-Xmx1638m", 
             "Reduce任务JVM参数"),
            ("mapreduce.task.io.sort.mb", "256", 
             "排序缓冲区大小(MB)"),
            ("mapreduce.task.io.sort.factor", "100", 
             "排序合并因子"),
            ("mapreduce.reduce.shuffle.parallelcopies", "10", 
             "Shuffle并行复制数"),
        ]
        
        for name, value, desc in properties:
            root.append(self._create_xml_element(name, value, desc))
        
        return self._format_xml(root)
    
    def generate_hadoop_env(self) -> str:
        """
        生成hadoop-env.sh配置
        
        Returns:
            str: hadoop-env.sh内容
        """
        env_content = f'''#!/bin/bash

# Hadoop Environment Configuration
# Generated by HadoopConfigGenerator

# Java配置
export JAVA_HOME={os.environ.get('JAVA_HOME', '/usr/lib/jvm/java-8-openjdk-amd64')}

# Hadoop配置
export HADOOP_HOME={os.environ.get('HADOOP_HOME', '/opt/hadoop')}
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_LOG_DIR=/var/log/hadoop

# HDFS配置
export HDFS_NAMENODE_USER=hadoop
export HDFS_DATANODE_USER=hadoop
export HDFS_SECONDARYNAMENODE_USER=hadoop

# YARN配置
export YARN_RESOURCEMANAGER_USER=hadoop
export YARN_NODEMANAGER_USER=hadoop

# MapReduce配置
export MAPRED_HISTORYSERVER_USER=hadoop

# JVM堆大小配置
export HADOOP_NAMENODE_OPTS="-Xmx{self.config.namenode_heap_size}m -Dhadoop.security.logger=INFO,RFAS"
export HADOOP_DATANODE_OPTS="-Xmx{self.config.datanode_heap_size}m -Dhadoop.security.logger=ERROR,RFAS"
export YARN_RESOURCEMANAGER_OPTS="-Xmx{self.config.resource_manager_heap_size}m"
export YARN_NODEMANAGER_OPTS="-Xmx{self.config.node_manager_heap_size}m"

# 其他配置
export HADOOP_PID_DIR=/var/run/hadoop
export HADOOP_SECURE_DN_PID_DIR=/var/run/hadoop
export HADOOP_IDENT_STRING=hadoop

# 日志配置
export HADOOP_ROOT_LOGGER=INFO,console
export HADOOP_SECURITY_LOGGER=INFO,NullAppender

# 本地库路径
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/native"
'''
        return env_content
    
    def generate_workers(self) -> str:
        """
        生成workers文件
        
        Returns:
            str: workers文件内容
        """
        return '\n'.join(self.config.worker_nodes) + '\n'
    
    def generate_all_configs(self, output_dir: str) -> Dict[str, str]:
        """
        生成所有配置文件
        
        Args:
            output_dir: 输出目录
            
        Returns:
            Dict[str, str]: 配置文件路径映射
        """
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)
        
        configs = {
            'core-site.xml': self.generate_core_site(),
            'hdfs-site.xml': self.generate_hdfs_site(),
            'yarn-site.xml': self.generate_yarn_site(),
            'mapred-site.xml': self.generate_mapred_site(),
            'hadoop-env.sh': self.generate_hadoop_env(),
            'workers': self.generate_workers()
        }
        
        file_paths = {}
        
        for filename, content in configs.items():
            file_path = output_path / filename
            with open(file_path, 'w', encoding='utf-8') as f:
                f.write(content)
            
            # 设置hadoop-env.sh为可执行
            if filename == 'hadoop-env.sh':
                os.chmod(file_path, 0o755)
            
            file_paths[filename] = str(file_path)
            print(f"✓ 生成配置文件: {file_path}")
        
        return file_paths
    
    def validate_config(self) -> Dict[str, Any]:
        """
        验证配置参数
        
        Returns:
            Dict[str, Any]: 验证结果
        """
        validation_result = {
            'valid': True,
            'errors': [],
            'warnings': []
        }
        
        # 检查必需的主机名
        if not self.config.namenode_host:
            validation_result['valid'] = False
            validation_result['errors'].append("NameNode主机名不能为空")
        
        if not self.config.resource_manager_host:
            validation_result['valid'] = False
            validation_result['errors'].append("ResourceManager主机名不能为空")
        
        # 检查端口范围
        ports = [
            ('namenode_port', self.config.namenode_port),
            ('namenode_http_port', self.config.namenode_http_port),
            ('resource_manager_port', self.config.resource_manager_port),
            ('resource_manager_webapp_port', self.config.resource_manager_webapp_port),
            ('history_server_port', self.config.history_server_port)
        ]
        
        for port_name, port_value in ports:
            if not (1024 <= port_value <= 65535):
                validation_result['valid'] = False
                validation_result['errors'].append(f"{port_name} 端口 {port_value} 超出有效范围 (1024-65535)")
        
        # 检查内存配置
        memory_configs = [
            ('namenode_heap_size', self.config.namenode_heap_size),
            ('datanode_heap_size', self.config.datanode_heap_size),
            ('resource_manager_heap_size', self.config.resource_manager_heap_size),
            ('node_manager_heap_size', self.config.node_manager_heap_size)
        ]
        
        for mem_name, mem_value in memory_configs:
            if mem_value < 512:
                validation_result['warnings'].append(f"{mem_name} 内存配置 {mem_value}MB 可能过小")
            elif mem_value > 8192:
                validation_result['warnings'].append(f"{mem_name} 内存配置 {mem_value}MB 可能过大")
        
        # 检查副本数
        if self.config.replication_factor < 1:
            validation_result['valid'] = False
            validation_result['errors'].append("副本数不能小于1")
        elif self.config.replication_factor > len(self.config.worker_nodes):
            validation_result['warnings'].append(f"副本数 {self.config.replication_factor} 大于Worker节点数 {len(self.config.worker_nodes)}")
        
        # 检查Worker节点
        if not self.config.worker_nodes:
            validation_result['valid'] = False
            validation_result['errors'].append("Worker节点列表不能为空")
        
        return validation_result

# 使用示例
if __name__ == "__main__":
    # 创建配置对象
    config = HadoopConfig(
        cluster_name="my-hadoop-cluster",
        namenode_host="master1",
        secondary_namenode_host="master2",
        resource_manager_host="master1",
        history_server_host="master1",
        worker_nodes=["worker1", "worker2", "worker3"],
        replication_factor=3
    )
    
    # 创建配置生成器
    generator = HadoopConfigGenerator(config)
    
    # 验证配置
    validation = generator.validate_config()
    print("=== 配置验证结果 ===")
    print(f"配置有效: {validation['valid']}")
    
    if validation['errors']:
        print("错误:")
        for error in validation['errors']:
            print(f"  - {error}")
    
    if validation['warnings']:
        print("警告:")
        for warning in validation['warnings']:
            print(f"  - {warning}")
    
    if validation['valid']:
        # 生成配置文件
        print("\n=== 生成配置文件 ===")
        output_dir = "/tmp/hadoop-config"
        file_paths = generator.generate_all_configs(output_dir)
        
        print(f"\n配置文件已生成到: {output_dir}")
        for filename, filepath in file_paths.items():
            print(f"  {filename}: {filepath}")

2.5.3 配置文件部署

#!/bin/bash
# Hadoop配置文件部署脚本

# 配置参数
HADOOP_USER="hadoop"
HADOOP_HOME="/opt/hadoop"
HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"
CONFIG_SOURCE_DIR="/tmp/hadoop-config"
NODES=("master1" "master2" "worker1" "worker2" "worker3")

# 备份现有配置
backup_configs() {
    echo "备份现有配置文件..."
    
    local backup_dir="$HADOOP_CONF_DIR/backup-$(date +%Y%m%d-%H%M%S)"
    
    if [ -d "$HADOOP_CONF_DIR" ]; then
        sudo -u "$HADOOP_USER" mkdir -p "$backup_dir"
        sudo -u "$HADOOP_USER" cp -r "$HADOOP_CONF_DIR"/* "$backup_dir"/
        echo "✓ 配置文件已备份到: $backup_dir"
    else
        echo "配置目录不存在,跳过备份"
    fi
}

# 部署配置文件到本地
deploy_local_configs() {
    echo "部署配置文件到本地..."
    
    local config_files=("core-site.xml" "hdfs-site.xml" "yarn-site.xml" "mapred-site.xml" "hadoop-env.sh" "workers")
    
    for config_file in "${config_files[@]}"; do
        local source_file="$CONFIG_SOURCE_DIR/$config_file"
        local target_file="$HADOOP_CONF_DIR/$config_file"
        
        if [ -f "$source_file" ]; then
            sudo -u "$HADOOP_USER" cp "$source_file" "$target_file"
            
            # 设置权限
            if [ "$config_file" = "hadoop-env.sh" ]; then
                sudo chmod 755 "$target_file"
            else
                sudo chmod 644 "$target_file"
            fi
            
            echo "✓ 部署配置文件: $config_file"
        else
            echo "✗ 配置文件不存在: $source_file"
        fi
    done
}

# 分发配置文件到集群
distribute_configs() {
    echo "分发配置文件到集群节点..."
    
    local current_node=$(hostname)
    
    for node in "${NODES[@]}"; do
        if [ "$node" != "$current_node" ]; then
            echo "正在分发配置到节点: $node"
            
            # 创建远程配置目录
            ssh "$HADOOP_USER@$node" "mkdir -p $HADOOP_CONF_DIR"
            
            # 复制配置文件
            scp -r "$HADOOP_CONF_DIR"/* "$HADOOP_USER@$node:$HADOOP_CONF_DIR/"
            
            if [ $? -eq 0 ]; then
                echo "✓ 配置文件已分发到: $node"
            else
                echo "✗ 配置文件分发失败: $node"
            fi
        fi
    done
}

# 验证配置文件
validate_configs() {
    echo "验证配置文件..."
    
    local config_files=("core-site.xml" "hdfs-site.xml" "yarn-site.xml" "mapred-site.xml")
    local validation_passed=true
    
    for config_file in "${config_files[@]}"; do
        local config_path="$HADOOP_CONF_DIR/$config_file"
        
        if [ -f "$config_path" ]; then
            # 检查XML格式
            if xmllint --noout "$config_path" 2>/dev/null; then
                echo "✓ $config_file 格式正确"
            else
                echo "✗ $config_file 格式错误"
                validation_passed=false
            fi
        else
            echo "✗ $config_file 文件不存在"
            validation_passed=false
        fi
    done
    
    # 检查hadoop-env.sh
    if [ -f "$HADOOP_CONF_DIR/hadoop-env.sh" ]; then
        if bash -n "$HADOOP_CONF_DIR/hadoop-env.sh" 2>/dev/null; then
            echo "✓ hadoop-env.sh 语法正确"
        else
            echo "✗ hadoop-env.sh 语法错误"
            validation_passed=false
        fi
    else
        echo "✗ hadoop-env.sh 文件不存在"
        validation_passed=false
    fi
    
    # 检查workers文件
    if [ -f "$HADOOP_CONF_DIR/workers" ]; then
        echo "✓ workers 文件存在"
        echo "Worker节点列表:"
        cat "$HADOOP_CONF_DIR/workers" | while read -r worker; do
            echo "  - $worker"
        done
    else
        echo "✗ workers 文件不存在"
        validation_passed=false
    fi
    
    if [ "$validation_passed" = true ]; then
        echo "✓ 所有配置文件验证通过"
        return 0
    else
        echo "✗ 配置文件验证失败"
        return 1
    fi
}

# 显示配置摘要
show_config_summary() {
    echo "=== 配置摘要 ==="
    
    # 从core-site.xml提取信息
    if [ -f "$HADOOP_CONF_DIR/core-site.xml" ]; then
        local fs_default=$(xmllint --xpath "//property[name='fs.defaultFS']/value/text()" "$HADOOP_CONF_DIR/core-site.xml" 2>/dev/null)
        echo "默认文件系统: $fs_default"
    fi
    
    # 从hdfs-site.xml提取信息
    if [ -f "$HADOOP_CONF_DIR/hdfs-site.xml" ]; then
        local replication=$(xmllint --xpath "//property[name='dfs.replication']/value/text()" "$HADOOP_CONF_DIR/hdfs-site.xml" 2>/dev/null)
        echo "副本数: $replication"
        
        local namenode_http=$(xmllint --xpath "//property[name='dfs.namenode.http-address']/value/text()" "$HADOOP_CONF_DIR/hdfs-site.xml" 2>/dev/null)
        echo "NameNode Web界面: http://$namenode_http"
    fi
    
    # 从yarn-site.xml提取信息
    if [ -f "$HADOOP_CONF_DIR/yarn-site.xml" ]; then
        local rm_webapp=$(xmllint --xpath "//property[name='yarn.resourcemanager.webapp.address']/value/text()" "$HADOOP_CONF_DIR/yarn-site.xml" 2>/dev/null)
        echo "ResourceManager Web界面: http://$rm_webapp"
    fi
    
    # 从mapred-site.xml提取信息
    if [ -f "$HADOOP_CONF_DIR/mapred-site.xml" ]; then
        local history_webapp=$(xmllint --xpath "//property[name='mapreduce.jobhistory.webapp.address']/value/text()" "$HADOOP_CONF_DIR/mapred-site.xml" 2>/dev/null)
        echo "JobHistory Web界面: http://$history_webapp"
    fi
}

# 主函数
main() {
    echo "=== 开始部署Hadoop配置文件 ==="
    
    # 检查配置源目录
    if [ ! -d "$CONFIG_SOURCE_DIR" ]; then
        echo "错误: 配置源目录不存在: $CONFIG_SOURCE_DIR"
        echo "请先使用配置生成工具生成配置文件"
        exit 1
    fi
    
    # 检查是否以hadoop用户运行
    if [ "$(whoami)" != "$HADOOP_USER" ]; then
        echo "请以 $HADOOP_USER 用户运行此脚本"
        exit 1
    fi
    
    # 执行部署步骤
    backup_configs
    deploy_local_configs
    validate_configs || exit 1
    distribute_configs
    show_config_summary
    
    echo "=== 配置文件部署完成 ==="
    echo "请在所有节点上验证配置文件是否正确部署"
}

main

本节小结

本节详细介绍了Hadoop的安装与配置过程,包括:

  1. 环境准备:系统要求、集群规划、节点角色分配
  2. Java环境配置:JDK安装、环境变量设置、验证
  3. SSH免密登录配置:密钥生成分发、批量操作工具
  4. Hadoop下载与安装:下载、用户创建、安装、目录结构
  5. 基础配置文件:配置文件说明、生成工具、部署脚本

通过这些步骤和工具,可以快速完成Hadoop集群的基础安装和配置。下一节将介绍具体的集群部署模式和启动过程。


本节小结

本节详细介绍了Hadoop的安装和配置过程,包括:

  1. 环境准备:系统要求、集群规划、节点角色分配
  2. Java环境配置:JDK安装、环境变量设置
  3. SSH免密登录配置:密钥生成、分发、批量操作工具
  4. Hadoop下载与安装:下载、用户创建、安装、目录结构

通过这些步骤,可以完成Hadoop的基础安装。下一