2.1 环境准备
2.1.1 系统要求
硬件要求
- CPU:64位处理器,建议4核以上
- 内存:最小8GB,推荐16GB以上
- 存储:最小100GB可用空间,推荐SSD
- 网络:千兆以太网
软件要求
- 操作系统:Linux(CentOS 7+、Ubuntu 18.04+、RHEL 7+)
- Java:Oracle JDK 8 或 OpenJDK 8⁄11
- SSH:用于集群节点间通信
- Python:2.7+ 或 3.6+(可选,用于某些工具)
2.1.2 集群规划
单机模式(Standalone Mode)
# 单机模式特点
- 所有进程运行在单个JVM中
- 主要用于开发和测试
- 不使用HDFS,直接使用本地文件系统
- 配置简单,启动快速
伪分布式模式(Pseudo-Distributed Mode)
# 伪分布式模式特点
- 所有守护进程运行在单台机器上
- 使用HDFS和YARN
- 模拟分布式环境
- 适合学习和小规模测试
完全分布式模式(Fully Distributed Mode)
# 完全分布式模式特点
- 守护进程分布在多台机器上
- 真正的分布式环境
- 生产环境推荐模式
- 高可用和容错能力
2.1.3 节点角色规划
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Hadoop集群节点规划工具
"""
from typing import Dict, List, Any
from dataclasses import dataclass
from enum import Enum
class NodeType(Enum):
"""节点类型"""
MASTER = "master"
WORKER = "worker"
EDGE = "edge"
GATEWAY = "gateway"
class ServiceType(Enum):
"""服务类型"""
NAMENODE = "namenode"
SECONDARY_NAMENODE = "secondary_namenode"
DATANODE = "datanode"
RESOURCE_MANAGER = "resource_manager"
NODE_MANAGER = "node_manager"
HISTORY_SERVER = "history_server"
ZOOKEEPER = "zookeeper"
HIVE_METASTORE = "hive_metastore"
SPARK_MASTER = "spark_master"
SPARK_WORKER = "spark_worker"
@dataclass
class NodeSpec:
"""节点规格"""
hostname: str
ip_address: str
cpu_cores: int
memory_gb: int
disk_gb: int
node_type: NodeType
services: List[ServiceType]
class ClusterPlanner:
"""集群规划器"""
def __init__(self):
self.nodes: List[NodeSpec] = []
self.service_requirements = {
ServiceType.NAMENODE: {'min_memory': 4, 'min_cores': 2, 'ha_support': True},
ServiceType.DATANODE: {'min_memory': 2, 'min_cores': 1, 'ha_support': False},
ServiceType.RESOURCE_MANAGER: {'min_memory': 4, 'min_cores': 2, 'ha_support': True},
ServiceType.NODE_MANAGER: {'min_memory': 2, 'min_cores': 1, 'ha_support': False},
ServiceType.SECONDARY_NAMENODE: {'min_memory': 2, 'min_cores': 1, 'ha_support': False},
ServiceType.HISTORY_SERVER: {'min_memory': 1, 'min_cores': 1, 'ha_support': False},
ServiceType.ZOOKEEPER: {'min_memory': 1, 'min_cores': 1, 'ha_support': True}
}
def add_node(self, hostname: str, ip_address: str, cpu_cores: int,
memory_gb: int, disk_gb: int, node_type: NodeType) -> bool:
"""
添加节点
Args:
hostname: 主机名
ip_address: IP地址
cpu_cores: CPU核心数
memory_gb: 内存大小(GB)
disk_gb: 磁盘大小(GB)
node_type: 节点类型
Returns:
bool: 添加是否成功
"""
try:
node = NodeSpec(
hostname=hostname,
ip_address=ip_address,
cpu_cores=cpu_cores,
memory_gb=memory_gb,
disk_gb=disk_gb,
node_type=node_type,
services=[]
)
self.nodes.append(node)
print(f"节点添加成功: {hostname} ({ip_address})")
return True
except Exception as e:
print(f"节点添加失败: {e}")
return False
def plan_services(self, enable_ha: bool = False) -> Dict[str, Any]:
"""
规划服务分布
Args:
enable_ha: 是否启用高可用
Returns:
Dict: 服务分布规划
"""
if not self.nodes:
return {'error': '没有可用节点'}
# 按节点类型分组
master_nodes = [n for n in self.nodes if n.node_type == NodeType.MASTER]
worker_nodes = [n for n in self.nodes if n.node_type == NodeType.WORKER]
if not master_nodes:
return {'error': '没有Master节点'}
if not worker_nodes:
return {'error': '没有Worker节点'}
# 服务分布规划
service_plan = {
'master_services': {},
'worker_services': {},
'recommendations': [],
'warnings': []
}
# Master节点服务分布
primary_master = master_nodes[0]
primary_master.services.extend([
ServiceType.NAMENODE,
ServiceType.RESOURCE_MANAGER,
ServiceType.HISTORY_SERVER
])
service_plan['master_services'][primary_master.hostname] = [
s.value for s in primary_master.services
]
# 高可用配置
if enable_ha and len(master_nodes) >= 2:
secondary_master = master_nodes[1]
secondary_master.services.extend([
ServiceType.NAMENODE, # Standby NameNode
ServiceType.RESOURCE_MANAGER # Standby ResourceManager
])
service_plan['master_services'][secondary_master.hostname] = [
s.value for s in secondary_master.services
]
# ZooKeeper集群(奇数个节点)
zk_nodes = master_nodes[:3] if len(master_nodes) >= 3 else master_nodes
for node in zk_nodes:
if ServiceType.ZOOKEEPER not in node.services:
node.services.append(ServiceType.ZOOKEEPER)
if node.hostname not in service_plan['master_services']:
service_plan['master_services'][node.hostname] = []
service_plan['master_services'][node.hostname].append(ServiceType.ZOOKEEPER.value)
else:
# 非HA模式,添加Secondary NameNode
if len(master_nodes) >= 2:
secondary_master = master_nodes[1]
secondary_master.services.append(ServiceType.SECONDARY_NAMENODE)
service_plan['master_services'][secondary_master.hostname] = [
ServiceType.SECONDARY_NAMENODE.value
]
else:
primary_master.services.append(ServiceType.SECONDARY_NAMENODE)
# Worker节点服务分布
for worker in worker_nodes:
worker.services.extend([
ServiceType.DATANODE,
ServiceType.NODE_MANAGER
])
service_plan['worker_services'][worker.hostname] = [
s.value for s in worker.services
]
# 生成建议和警告
self._generate_recommendations(service_plan, enable_ha)
return service_plan
def _generate_recommendations(self, service_plan: Dict[str, Any], enable_ha: bool):
"""
生成建议和警告
Args:
service_plan: 服务规划
enable_ha: 是否启用高可用
"""
recommendations = []
warnings = []
# 检查节点数量
master_count = len([n for n in self.nodes if n.node_type == NodeType.MASTER])
worker_count = len([n for n in self.nodes if n.node_type == NodeType.WORKER])
if master_count < 2 and enable_ha:
warnings.append("高可用模式建议至少2个Master节点")
if worker_count < 3:
warnings.append("建议至少3个Worker节点以保证数据可靠性")
# 检查资源配置
for node in self.nodes:
if node.node_type == NodeType.MASTER:
if node.memory_gb < 8:
warnings.append(f"Master节点 {node.hostname} 内存不足,建议至少8GB")
if node.cpu_cores < 4:
warnings.append(f"Master节点 {node.hostname} CPU核心数不足,建议至少4核")
if node.node_type == NodeType.WORKER:
if node.memory_gb < 4:
warnings.append(f"Worker节点 {node.hostname} 内存不足,建议至少4GB")
if node.disk_gb < 100:
warnings.append(f"Worker节点 {node.hostname} 磁盘空间不足,建议至少100GB")
# 生成建议
recommendations.extend([
"配置SSH免密登录",
"同步所有节点时间",
"配置防火墙规则",
"设置合适的JVM堆大小",
"配置数据目录权限"
])
if enable_ha:
recommendations.extend([
"配置ZooKeeper集群",
"设置共享存储(如NFS)用于NameNode元数据",
"配置自动故障转移"
])
service_plan['recommendations'] = recommendations
service_plan['warnings'] = warnings
def generate_hosts_file(self) -> str:
"""
生成hosts文件内容
Returns:
str: hosts文件内容
"""
hosts_content = "# Hadoop Cluster Hosts\n"
for node in self.nodes:
hosts_content += f"{node.ip_address}\t{node.hostname}\n"
return hosts_content
def generate_inventory(self) -> Dict[str, List[str]]:
"""
生成Ansible inventory
Returns:
Dict: Ansible inventory配置
"""
inventory = {
'masters': [],
'workers': [],
'all': []
}
for node in self.nodes:
inventory['all'].append(node.hostname)
if node.node_type == NodeType.MASTER:
inventory['masters'].append(node.hostname)
elif node.node_type == NodeType.WORKER:
inventory['workers'].append(node.hostname)
return inventory
def validate_cluster(self) -> Dict[str, Any]:
"""
验证集群配置
Returns:
Dict: 验证结果
"""
validation_result = {
'valid': True,
'errors': [],
'warnings': [],
'summary': {}
}
# 检查基本要求
if len(self.nodes) == 0:
validation_result['valid'] = False
validation_result['errors'].append("集群中没有节点")
return validation_result
master_nodes = [n for n in self.nodes if n.node_type == NodeType.MASTER]
worker_nodes = [n for n in self.nodes if n.node_type == NodeType.WORKER]
if len(master_nodes) == 0:
validation_result['valid'] = False
validation_result['errors'].append("集群中没有Master节点")
if len(worker_nodes) == 0:
validation_result['valid'] = False
validation_result['errors'].append("集群中没有Worker节点")
# 检查IP地址唯一性
ip_addresses = [node.ip_address for node in self.nodes]
if len(ip_addresses) != len(set(ip_addresses)):
validation_result['valid'] = False
validation_result['errors'].append("存在重复的IP地址")
# 检查主机名唯一性
hostnames = [node.hostname for node in self.nodes]
if len(hostnames) != len(set(hostnames)):
validation_result['valid'] = False
validation_result['errors'].append("存在重复的主机名")
# 生成摘要
validation_result['summary'] = {
'total_nodes': len(self.nodes),
'master_nodes': len(master_nodes),
'worker_nodes': len(worker_nodes),
'total_memory': sum(node.memory_gb for node in self.nodes),
'total_cores': sum(node.cpu_cores for node in self.nodes),
'total_storage': sum(node.disk_gb for node in self.nodes)
}
return validation_result
# 使用示例
if __name__ == "__main__":
# 创建集群规划器
planner = ClusterPlanner()
# 添加节点
planner.add_node("master1", "192.168.1.10", 8, 16, 500, NodeType.MASTER)
planner.add_node("master2", "192.168.1.11", 8, 16, 500, NodeType.MASTER)
planner.add_node("worker1", "192.168.1.20", 4, 8, 1000, NodeType.WORKER)
planner.add_node("worker2", "192.168.1.21", 4, 8, 1000, NodeType.WORKER)
planner.add_node("worker3", "192.168.1.22", 4, 8, 1000, NodeType.WORKER)
# 验证集群配置
validation = planner.validate_cluster()
print("=== 集群验证结果 ===")
print(f"配置有效: {validation['valid']}")
if validation['errors']:
print("错误:")
for error in validation['errors']:
print(f" - {error}")
print(f"\n集群摘要:")
summary = validation['summary']
print(f" 总节点数: {summary['total_nodes']}")
print(f" Master节点: {summary['master_nodes']}")
print(f" Worker节点: {summary['worker_nodes']}")
print(f" 总内存: {summary['total_memory']}GB")
print(f" 总CPU核心: {summary['total_cores']}")
print(f" 总存储: {summary['total_storage']}GB")
# 规划服务分布
service_plan = planner.plan_services(enable_ha=True)
print("\n=== 服务分布规划 ===")
print("Master节点服务:")
for hostname, services in service_plan['master_services'].items():
print(f" {hostname}: {', '.join(services)}")
print("\nWorker节点服务:")
for hostname, services in service_plan['worker_services'].items():
print(f" {hostname}: {', '.join(services)}")
if service_plan['warnings']:
print("\n警告:")
for warning in service_plan['warnings']:
print(f" - {warning}")
print("\n建议:")
for recommendation in service_plan['recommendations']:
print(f" - {recommendation}")
# 生成hosts文件
hosts_content = planner.generate_hosts_file()
print("\n=== Hosts文件内容 ===")
print(hosts_content)
2.2 Java环境配置
2.2.1 Java安装
方式一:使用包管理器安装OpenJDK
#!/bin/bash
# CentOS/RHEL系统
sudo yum update -y
sudo yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
# Ubuntu/Debian系统
sudo apt update
sudo apt install -y openjdk-8-jdk
# 验证安装
java -version
javac -version
方式二:手动安装Oracle JDK
#!/bin/bash
# 下载Oracle JDK 8(需要Oracle账号)
# 或者使用已下载的JDK包
# 创建Java安装目录
sudo mkdir -p /opt/java
# 解压JDK
sudo tar -xzf jdk-8u291-linux-x64.tar.gz -C /opt/java/
# 创建符号链接
sudo ln -s /opt/java/jdk1.8.0_291 /opt/java/current
# 设置环境变量
echo 'export JAVA_HOME=/opt/java/current' | sudo tee -a /etc/profile
echo 'export PATH=$JAVA_HOME/bin:$PATH' | sudo tee -a /etc/profile
# 重新加载环境变量
source /etc/profile
# 验证安装
java -version
2.2.2 环境变量配置
#!/bin/bash
# 创建Java环境配置脚本
cat > /tmp/setup_java_env.sh << 'EOF'
#!/bin/bash
# 检测Java安装路径
detect_java_home() {
if [ -n "$JAVA_HOME" ] && [ -x "$JAVA_HOME/bin/java" ]; then
echo "JAVA_HOME已设置: $JAVA_HOME"
return 0
fi
# 常见的Java安装路径
JAVA_PATHS=(
"/usr/lib/jvm/java-8-openjdk-amd64"
"/usr/lib/jvm/java-1.8.0-openjdk"
"/opt/java/current"
"/usr/java/default"
"/usr/java/latest"
)
for path in "${JAVA_PATHS[@]}"; do
if [ -x "$path/bin/java" ]; then
export JAVA_HOME="$path"
echo "检测到Java安装路径: $JAVA_HOME"
return 0
fi
done
echo "错误: 未找到Java安装路径"
return 1
}
# 配置环境变量
setup_java_env() {
if ! detect_java_home; then
exit 1
fi
# 创建环境变量配置文件
cat > /tmp/java_env.sh << EOL
# Java Environment Variables
export JAVA_HOME=$JAVA_HOME
export JRE_HOME=\$JAVA_HOME/jre
export CLASSPATH=.:\$JAVA_HOME/lib:\$JRE_HOME/lib
export PATH=\$JAVA_HOME/bin:\$PATH
EOL
# 添加到系统环境变量
sudo cp /tmp/java_env.sh /etc/profile.d/
sudo chmod +x /etc/profile.d/java_env.sh
# 添加到用户环境变量
if ! grep -q "JAVA_HOME" ~/.bashrc; then
cat /tmp/java_env.sh >> ~/.bashrc
fi
# 重新加载环境变量
source /etc/profile.d/java_env.sh
source ~/.bashrc
echo "Java环境变量配置完成"
}
# 验证Java环境
verify_java() {
echo "=== Java环境验证 ==="
echo "JAVA_HOME: $JAVA_HOME"
echo "Java版本:"
java -version
echo "Javac版本:"
javac -version
# 检查Java版本是否为1.8
JAVA_VERSION=$(java -version 2>&1 | grep "version" | awk '{print $3}' | sed 's/"//g')
if [[ $JAVA_VERSION == 1.8* ]]; then
echo "✓ Java版本符合要求 (1.8.x)"
else
echo "⚠ 警告: Java版本不是1.8.x,可能存在兼容性问题"
fi
}
# 主函数
main() {
echo "开始配置Java环境..."
setup_java_env
verify_java
echo "Java环境配置完成!"
}
main
EOF
# 执行Java环境配置
chmod +x /tmp/setup_java_env.sh
/tmp/setup_java_env.sh
2.3 SSH免密登录配置
2.3.1 SSH密钥生成和分发
#!/bin/bash
# SSH免密登录配置脚本
# 配置参数
USER="hadoop" # Hadoop用户
NODES=("master1" "master2" "worker1" "worker2" "worker3") # 集群节点
# 生成SSH密钥对
generate_ssh_keys() {
echo "生成SSH密钥对..."
# 检查是否已存在密钥
if [ -f ~/.ssh/id_rsa ]; then
echo "SSH密钥已存在,跳过生成"
return 0
fi
# 生成密钥对(无密码)
ssh-keygen -t rsa -b 4096 -f ~/.ssh/id_rsa -N ""
if [ $? -eq 0 ]; then
echo "✓ SSH密钥生成成功"
else
echo "✗ SSH密钥生成失败"
return 1
fi
}
# 分发公钥到所有节点
distribute_public_key() {
echo "分发公钥到集群节点..."
for node in "${NODES[@]}"; do
echo "正在配置节点: $node"
# 复制公钥到目标节点
ssh-copy-id -i ~/.ssh/id_rsa.pub $USER@$node
if [ $? -eq 0 ]; then
echo "✓ 公钥已成功复制到 $node"
else
echo "✗ 公钥复制到 $node 失败"
fi
done
}
# 测试SSH免密登录
test_ssh_connection() {
echo "测试SSH免密登录..."
for node in "${NODES[@]}"; do
echo "测试连接到: $node"
# 测试SSH连接
ssh -o BatchMode=yes -o ConnectTimeout=5 $USER@$node 'echo "SSH连接成功"'
if [ $? -eq 0 ]; then
echo "✓ $node SSH免密登录正常"
else
echo "✗ $node SSH免密登录失败"
fi
done
}
# 配置SSH客户端
configure_ssh_client() {
echo "配置SSH客户端..."
# 创建SSH配置文件
cat > ~/.ssh/config << EOF
# Hadoop Cluster SSH Configuration
Host *
StrictHostKeyChecking no
UserKnownHostsFile /dev/null
LogLevel ERROR
ServerAliveInterval 60
ServerAliveCountMax 3
EOF
# 为每个节点添加配置
for node in "${NODES[@]}"; do
cat >> ~/.ssh/config << EOF
Host $node
HostName $node
User $USER
IdentityFile ~/.ssh/id_rsa
EOF
done
# 设置权限
chmod 600 ~/.ssh/config
echo "✓ SSH客户端配置完成"
}
# 主函数
main() {
echo "=== 开始配置SSH免密登录 ==="
# 确保.ssh目录存在
mkdir -p ~/.ssh
chmod 700 ~/.ssh
# 执行配置步骤
generate_ssh_keys
configure_ssh_client
distribute_public_key
test_ssh_connection
echo "=== SSH免密登录配置完成 ==="
}
# 检查是否以正确用户运行
if [ "$(whoami)" != "$USER" ]; then
echo "请以 $USER 用户运行此脚本"
exit 1
fi
main
2.3.2 批量SSH操作工具
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
批量SSH操作工具
用于在Hadoop集群中批量执行命令
"""
import subprocess
import threading
import time
from typing import List, Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
class SSHManager:
"""SSH管理器"""
def __init__(self, nodes: List[str], user: str = "hadoop",
timeout: int = 30, max_workers: int = 10):
"""
初始化SSH管理器
Args:
nodes: 节点列表
user: SSH用户名
timeout: 连接超时时间
max_workers: 最大并发数
"""
self.nodes = nodes
self.user = user
self.timeout = timeout
self.max_workers = max_workers
def execute_command(self, node: str, command: str) -> Dict[str, Any]:
"""
在单个节点执行命令
Args:
node: 目标节点
command: 要执行的命令
Returns:
Dict: 执行结果
"""
try:
# 构建SSH命令
ssh_cmd = [
"ssh",
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=5",
"-o", "StrictHostKeyChecking=no",
f"{self.user}@{node}",
command
]
# 执行命令
start_time = time.time()
result = subprocess.run(
ssh_cmd,
capture_output=True,
text=True,
timeout=self.timeout
)
end_time = time.time()
return {
'node': node,
'command': command,
'success': result.returncode == 0,
'return_code': result.returncode,
'stdout': result.stdout.strip(),
'stderr': result.stderr.strip(),
'execution_time': round(end_time - start_time, 2)
}
except subprocess.TimeoutExpired:
return {
'node': node,
'command': command,
'success': False,
'return_code': -1,
'stdout': '',
'stderr': f'Command timeout after {self.timeout} seconds',
'execution_time': self.timeout
}
except Exception as e:
return {
'node': node,
'command': command,
'success': False,
'return_code': -1,
'stdout': '',
'stderr': str(e),
'execution_time': 0
}
def execute_on_all(self, command: str, parallel: bool = True) -> List[Dict[str, Any]]:
"""
在所有节点执行命令
Args:
command: 要执行的命令
parallel: 是否并行执行
Returns:
List[Dict]: 所有节点的执行结果
"""
results = []
if parallel:
# 并行执行
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_node = {
executor.submit(self.execute_command, node, command): node
for node in self.nodes
}
for future in as_completed(future_to_node):
result = future.result()
results.append(result)
else:
# 串行执行
for node in self.nodes:
result = self.execute_command(node, command)
results.append(result)
# 按节点名排序
results.sort(key=lambda x: x['node'])
return results
def copy_file(self, local_path: str, remote_path: str,
nodes: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""
复制文件到节点
Args:
local_path: 本地文件路径
remote_path: 远程文件路径
nodes: 目标节点列表,默认为所有节点
Returns:
List[Dict]: 复制结果
"""
target_nodes = nodes or self.nodes
results = []
def copy_to_node(node: str) -> Dict[str, Any]:
try:
scp_cmd = [
"scp",
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=5",
"-o", "StrictHostKeyChecking=no",
local_path,
f"{self.user}@{node}:{remote_path}"
]
start_time = time.time()
result = subprocess.run(
scp_cmd,
capture_output=True,
text=True,
timeout=self.timeout
)
end_time = time.time()
return {
'node': node,
'operation': 'copy_file',
'local_path': local_path,
'remote_path': remote_path,
'success': result.returncode == 0,
'stderr': result.stderr.strip(),
'execution_time': round(end_time - start_time, 2)
}
except Exception as e:
return {
'node': node,
'operation': 'copy_file',
'local_path': local_path,
'remote_path': remote_path,
'success': False,
'stderr': str(e),
'execution_time': 0
}
# 并行复制
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_node = {
executor.submit(copy_to_node, node): node
for node in target_nodes
}
for future in as_completed(future_to_node):
result = future.result()
results.append(result)
results.sort(key=lambda x: x['node'])
return results
def check_connectivity(self) -> Dict[str, Any]:
"""
检查所有节点的连通性
Returns:
Dict: 连通性检查结果
"""
results = self.execute_on_all("echo 'SSH connection test'")
summary = {
'total_nodes': len(self.nodes),
'reachable_nodes': 0,
'unreachable_nodes': 0,
'node_status': {},
'unreachable_list': []
}
for result in results:
node = result['node']
if result['success']:
summary['reachable_nodes'] += 1
summary['node_status'][node] = 'reachable'
else:
summary['unreachable_nodes'] += 1
summary['node_status'][node] = 'unreachable'
summary['unreachable_list'].append(node)
return summary
def get_system_info(self) -> List[Dict[str, Any]]:
"""
获取所有节点的系统信息
Returns:
List[Dict]: 系统信息
"""
commands = {
'hostname': 'hostname',
'os_release': 'cat /etc/os-release | grep PRETTY_NAME | cut -d"=" -f2 | tr -d \'"\' || echo "Unknown"',
'kernel': 'uname -r',
'cpu_info': 'lscpu | grep "Model name" | cut -d":" -f2 | xargs || echo "Unknown"',
'cpu_cores': 'nproc',
'memory_total': 'free -h | grep Mem | awk "{print \$2}"',
'memory_available': 'free -h | grep Mem | awk "{print \$7}"',
'disk_usage': 'df -h / | tail -1 | awk "{print \$3\"/\"\$2\" (\"\$5\")\""',
'uptime': 'uptime | awk "{print \$3\" \"\$4}" | sed "s/,//"',
'java_version': 'java -version 2>&1 | head -1 | cut -d\'"\' -f2 || echo "Not installed"'
}
system_info = []
for node in self.nodes:
node_info = {'node': node}
for key, command in commands.items():
result = self.execute_command(node, command)
if result['success']:
node_info[key] = result['stdout']
else:
node_info[key] = 'Error: ' + result['stderr']
system_info.append(node_info)
return system_info
def print_results(self, results: List[Dict[str, Any]], show_details: bool = True):
"""
打印执行结果
Args:
results: 执行结果列表
show_details: 是否显示详细信息
"""
print(f"\n=== 执行结果 ({len(results)}个节点) ===")
success_count = sum(1 for r in results if r['success'])
failure_count = len(results) - success_count
print(f"成功: {success_count}, 失败: {failure_count}")
if show_details:
for result in results:
status = "✓" if result['success'] else "✗"
print(f"\n{status} {result['node']}:")
if result['success'] and result.get('stdout'):
print(f" 输出: {result['stdout']}")
if not result['success'] and result.get('stderr'):
print(f" 错误: {result['stderr']}")
if 'execution_time' in result:
print(f" 耗时: {result['execution_time']}秒")
# 使用示例
if __name__ == "__main__":
# 定义集群节点
nodes = ["master1", "master2", "worker1", "worker2", "worker3"]
# 创建SSH管理器
ssh_manager = SSHManager(nodes, user="hadoop")
# 检查连通性
print("=== 检查节点连通性 ===")
connectivity = ssh_manager.check_connectivity()
print(f"可达节点: {connectivity['reachable_nodes']}/{connectivity['total_nodes']}")
if connectivity['unreachable_list']:
print(f"不可达节点: {', '.join(connectivity['unreachable_list'])}")
# 获取系统信息
print("\n=== 获取系统信息 ===")
system_info = ssh_manager.get_system_info()
for info in system_info:
print(f"\n节点: {info['node']}")
print(f" 主机名: {info['hostname']}")
print(f" 操作系统: {info['os_release']}")
print(f" 内核版本: {info['kernel']}")
print(f" CPU: {info['cpu_info']} ({info['cpu_cores']} cores)")
print(f" 内存: {info['memory_available']}/{info['memory_total']}")
print(f" 磁盘使用: {info['disk_usage']}")
print(f" 运行时间: {info['uptime']}")
print(f" Java版本: {info['java_version']}")
# 执行批量命令
print("\n=== 执行批量命令 ===")
results = ssh_manager.execute_on_all("date")
ssh_manager.print_results(results)
2.4 Hadoop下载与安装
2.4.1 下载Hadoop
#!/bin/bash
# Hadoop下载和安装脚本
# 配置参数
HADOOP_VERSION="3.3.4"
HADOOP_USER="hadoop"
HADOOP_HOME="/opt/hadoop"
HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"
HADOOP_DATA_DIR="/data/hadoop"
HADOOP_LOG_DIR="/var/log/hadoop"
# 下载Hadoop
download_hadoop() {
echo "下载Hadoop $HADOOP_VERSION..."
# 创建临时下载目录
mkdir -p /tmp/hadoop-install
cd /tmp/hadoop-install
# 下载Hadoop二进制包
HADOOP_URL="https://archive.apache.org/dist/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz"
if [ ! -f "hadoop-$HADOOP_VERSION.tar.gz" ]; then
echo "正在下载: $HADOOP_URL"
wget "$HADOOP_URL" || {
echo "下载失败,尝试使用镜像站点"
# 使用清华大学镜像
MIRROR_URL="https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz"
wget "$MIRROR_URL" || {
echo "下载失败,请手动下载Hadoop包"
return 1
}
}
else
echo "Hadoop包已存在,跳过下载"
fi
# 验证下载的文件
if [ -f "hadoop-$HADOOP_VERSION.tar.gz" ]; then
echo "✓ Hadoop下载完成"
return 0
else
echo "✗ Hadoop下载失败"
return 1
fi
}
# 创建Hadoop用户
create_hadoop_user() {
echo "创建Hadoop用户..."
# 检查用户是否已存在
if id "$HADOOP_USER" &>/dev/null; then
echo "用户 $HADOOP_USER 已存在"
return 0
fi
# 创建用户组
sudo groupadd hadoop 2>/dev/null || true
# 创建用户
sudo useradd -g hadoop -m -s /bin/bash "$HADOOP_USER"
# 设置密码(可选)
echo "请为用户 $HADOOP_USER 设置密码:"
sudo passwd "$HADOOP_USER"
# 添加sudo权限
echo "$HADOOP_USER ALL=(ALL) NOPASSWD:ALL" | sudo tee /etc/sudoers.d/hadoop
echo "✓ Hadoop用户创建完成"
}
# 安装Hadoop
install_hadoop() {
echo "安装Hadoop..."
# 创建安装目录
sudo mkdir -p "$HADOOP_HOME"
sudo mkdir -p "$HADOOP_DATA_DIR"
sudo mkdir -p "$HADOOP_LOG_DIR"
# 解压Hadoop
echo "解压Hadoop到 $HADOOP_HOME"
sudo tar -xzf "/tmp/hadoop-install/hadoop-$HADOOP_VERSION.tar.gz" -C /opt/
# 创建符号链接
sudo ln -sf "/opt/hadoop-$HADOOP_VERSION" "$HADOOP_HOME"
# 设置目录权限
sudo chown -R "$HADOOP_USER:hadoop" "$HADOOP_HOME"
sudo chown -R "$HADOOP_USER:hadoop" "$HADOOP_DATA_DIR"
sudo chown -R "$HADOOP_USER:hadoop" "$HADOOP_LOG_DIR"
# 设置目录权限
sudo chmod 755 "$HADOOP_HOME"
sudo chmod 755 "$HADOOP_DATA_DIR"
sudo chmod 755 "$HADOOP_LOG_DIR"
echo "✓ Hadoop安装完成"
}
# 配置环境变量
setup_environment() {
echo "配置Hadoop环境变量..."
# 创建Hadoop环境变量文件
cat > /tmp/hadoop_env.sh << EOF
# Hadoop Environment Variables
export HADOOP_HOME=$HADOOP_HOME
export HADOOP_CONF_DIR=$HADOOP_CONF_DIR
export HADOOP_DATA_DIR=$HADOOP_DATA_DIR
export HADOOP_LOG_DIR=$HADOOP_LOG_DIR
export HADOOP_CLASSPATH=\$JAVA_HOME/lib/tools.jar
export PATH=\$HADOOP_HOME/bin:\$HADOOP_HOME/sbin:\$PATH
# HDFS Environment
export HDFS_NAMENODE_USER=$HADOOP_USER
export HDFS_DATANODE_USER=$HADOOP_USER
export HDFS_SECONDARYNAMENODE_USER=$HADOOP_USER
# YARN Environment
export YARN_RESOURCEMANAGER_USER=$HADOOP_USER
export YARN_NODEMANAGER_USER=$HADOOP_USER
EOF
# 添加到系统环境变量
sudo cp /tmp/hadoop_env.sh /etc/profile.d/
sudo chmod +x /etc/profile.d/hadoop_env.sh
# 添加到Hadoop用户环境变量
sudo -u "$HADOOP_USER" bash -c "cat /tmp/hadoop_env.sh >> /home/$HADOOP_USER/.bashrc"
echo "✓ 环境变量配置完成"
}
# 验证安装
verify_installation() {
echo "验证Hadoop安装..."
# 切换到Hadoop用户执行验证
sudo -u "$HADOOP_USER" bash -c '
source ~/.bashrc
echo "=== Hadoop安装验证 ==="
echo "HADOOP_HOME: $HADOOP_HOME"
echo "Hadoop版本:"
$HADOOP_HOME/bin/hadoop version
echo "\nHDFS版本:"
$HADOOP_HOME/bin/hdfs version
echo "\nYARN版本:"
$HADOOP_HOME/bin/yarn version
echo "\nMapReduce版本:"
$HADOOP_HOME/bin/mapred version
'
if [ $? -eq 0 ]; then
echo "✓ Hadoop安装验证成功"
else
echo "✗ Hadoop安装验证失败"
return 1
fi
}
# 清理临时文件
cleanup() {
echo "清理临时文件..."
rm -rf /tmp/hadoop-install
rm -f /tmp/hadoop_env.sh
echo "✓ 清理完成"
}
# 主函数
main() {
echo "=== 开始安装Hadoop $HADOOP_VERSION ==="
# 检查是否以root用户运行
if [ "$EUID" -ne 0 ]; then
echo "请以root用户运行此脚本"
exit 1
fi
# 执行安装步骤
download_hadoop || exit 1
create_hadoop_user || exit 1
install_hadoop || exit 1
setup_environment || exit 1
verify_installation || exit 1
cleanup
echo "=== Hadoop安装完成 ==="
echo "请使用以下命令切换到Hadoop用户:"
echo "su - $HADOOP_USER"
echo "然后执行: source ~/.bashrc"
}
main
2.4.2 目录结构说明
#!/bin/bash
# Hadoop目录结构说明和创建脚本
# 显示Hadoop目录结构
show_hadoop_structure() {
echo "=== Hadoop目录结构 ==="
cat << 'EOF'
Hadoop安装目录结构:
/opt/hadoop/ # Hadoop安装根目录
├── bin/ # 可执行文件目录
│ ├── hadoop # Hadoop主命令
│ ├── hdfs # HDFS命令
│ ├── yarn # YARN命令
│ └── mapred # MapReduce命令
├── sbin/ # 系统管理脚本
│ ├── start-dfs.sh # 启动HDFS
│ ├── stop-dfs.sh # 停止HDFS
│ ├── start-yarn.sh # 启动YARN
│ ├── stop-yarn.sh # 停止YARN
│ └── start-all.sh # 启动所有服务
├── etc/hadoop/ # 配置文件目录
│ ├── core-site.xml # 核心配置
│ ├── hdfs-site.xml # HDFS配置
│ ├── yarn-site.xml # YARN配置
│ ├── mapred-site.xml # MapReduce配置
│ ├── hadoop-env.sh # 环境变量配置
│ ├── workers # Worker节点列表
│ └── log4j.properties # 日志配置
├── lib/ # 库文件目录
├── libexec/ # 内部脚本目录
├── logs/ # 日志文件目录
├── share/ # 共享文件目录
│ ├── hadoop/ # Hadoop核心JAR包
│ └── doc/ # 文档目录
└── include/ # 头文件目录
数据目录结构:
/data/hadoop/ # Hadoop数据根目录
├── hdfs/ # HDFS数据目录
│ ├── namenode/ # NameNode数据目录
│ ├── datanode/ # DataNode数据目录
│ └── secondary/ # Secondary NameNode数据目录
├── yarn/ # YARN数据目录
│ ├── local/ # 本地目录
│ └── logs/ # 应用日志目录
└── tmp/ # 临时目录
日志目录结构:
/var/log/hadoop/ # Hadoop日志根目录
├── hdfs/ # HDFS日志
├── yarn/ # YARN日志
├── mapreduce/ # MapReduce日志
└── audit/ # 审计日志
EOF
}
# 创建Hadoop数据目录
create_data_directories() {
echo "创建Hadoop数据目录..."
local hadoop_user="hadoop"
local data_dirs=(
"/data/hadoop"
"/data/hadoop/hdfs"
"/data/hadoop/hdfs/namenode"
"/data/hadoop/hdfs/datanode"
"/data/hadoop/hdfs/secondary"
"/data/hadoop/yarn"
"/data/hadoop/yarn/local"
"/data/hadoop/yarn/logs"
"/data/hadoop/tmp"
)
local log_dirs=(
"/var/log/hadoop"
"/var/log/hadoop/hdfs"
"/var/log/hadoop/yarn"
"/var/log/hadoop/mapreduce"
"/var/log/hadoop/audit"
)
# 创建数据目录
for dir in "${data_dirs[@]}"; do
sudo mkdir -p "$dir"
sudo chown "$hadoop_user:hadoop" "$dir"
sudo chmod 755 "$dir"
echo "✓ 创建目录: $dir"
done
# 创建日志目录
for dir in "${log_dirs[@]}"; do
sudo mkdir -p "$dir"
sudo chown "$hadoop_user:hadoop" "$dir"
sudo chmod 755 "$dir"
echo "✓ 创建目录: $dir"
done
echo "✓ 数据目录创建完成"
}
# 设置目录权限
set_directory_permissions() {
echo "设置目录权限..."
local hadoop_user="hadoop"
# 设置Hadoop安装目录权限
sudo chown -R "$hadoop_user:hadoop" /opt/hadoop*
sudo chmod -R 755 /opt/hadoop*/bin
sudo chmod -R 755 /opt/hadoop*/sbin
sudo chmod -R 644 /opt/hadoop*/etc/hadoop/*
sudo chmod 755 /opt/hadoop*/etc/hadoop
# 设置数据目录权限
sudo chown -R "$hadoop_user:hadoop" /data/hadoop
sudo chmod -R 755 /data/hadoop
# 设置日志目录权限
sudo chown -R "$hadoop_user:hadoop" /var/log/hadoop
sudo chmod -R 755 /var/log/hadoop
echo "✓ 目录权限设置完成"
}
# 创建符号链接
create_symlinks() {
echo "创建符号链接..."
# 创建命令符号链接到系统PATH
local commands=("hadoop" "hdfs" "yarn" "mapred")
for cmd in "${commands[@]}"; do
if [ ! -L "/usr/local/bin/$cmd" ]; then
sudo ln -s "/opt/hadoop/bin/$cmd" "/usr/local/bin/$cmd"
echo "✓ 创建符号链接: /usr/local/bin/$cmd"
fi
done
echo "✓ 符号链接创建完成"
}
# 验证目录结构
verify_directory_structure() {
echo "验证目录结构..."
local required_dirs=(
"/opt/hadoop"
"/opt/hadoop/bin"
"/opt/hadoop/sbin"
"/opt/hadoop/etc/hadoop"
"/data/hadoop"
"/data/hadoop/hdfs/namenode"
"/data/hadoop/hdfs/datanode"
"/var/log/hadoop"
)
local missing_dirs=()
for dir in "${required_dirs[@]}"; do
if [ ! -d "$dir" ]; then
missing_dirs+=("$dir")
fi
done
if [ ${#missing_dirs[@]} -eq 0 ]; then
echo "✓ 目录结构验证通过"
return 0
else
echo "✗ 以下目录缺失:"
for dir in "${missing_dirs[@]}"; do
echo " - $dir"
done
return 1
fi
}
# 主函数
main() {
echo "=== Hadoop目录结构配置 ==="
show_hadoop_structure
echo
create_data_directories
set_directory_permissions
create_symlinks
verify_directory_structure
echo "=== 目录结构配置完成 ==="
}
main
2.5 基础配置文件
2.5.1 核心配置文件说明
Hadoop的配置文件位于$HADOOP_HOME/etc/hadoop/
目录下,主要包括:
- core-site.xml:核心配置,定义文件系统和RPC设置
- hdfs-site.xml:HDFS配置,定义NameNode和DataNode设置
- yarn-site.xml:YARN配置,定义ResourceManager和NodeManager设置
- mapred-site.xml:MapReduce配置,定义作业执行设置
- hadoop-env.sh:环境变量配置
- workers:Worker节点列表
2.5.2 配置文件生成工具
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Hadoop配置文件生成工具
"""
import os
import xml.etree.ElementTree as ET
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from pathlib import Path
@dataclass
class HadoopConfig:
"""Hadoop配置参数"""
cluster_name: str = "hadoop-cluster"
namenode_host: str = "master1"
namenode_port: int = 9000
namenode_http_port: int = 9870
secondary_namenode_host: str = "master2"
resource_manager_host: str = "master1"
resource_manager_port: int = 8032
resource_manager_webapp_port: int = 8088
history_server_host: str = "master1"
history_server_port: int = 19888
# 目录配置
hadoop_tmp_dir: str = "/data/hadoop/tmp"
namenode_dir: str = "/data/hadoop/hdfs/namenode"
datanode_dir: str = "/data/hadoop/hdfs/datanode"
secondary_namenode_dir: str = "/data/hadoop/hdfs/secondary"
yarn_local_dir: str = "/data/hadoop/yarn/local"
yarn_log_dir: str = "/data/hadoop/yarn/logs"
# 内存配置(MB)
namenode_heap_size: int = 2048
datanode_heap_size: int = 1024
resource_manager_heap_size: int = 2048
node_manager_heap_size: int = 1024
# 副本数配置
replication_factor: int = 3
# Worker节点列表
worker_nodes: List[str] = None
def __post_init__(self):
if self.worker_nodes is None:
self.worker_nodes = ["worker1", "worker2", "worker3"]
class HadoopConfigGenerator:
"""Hadoop配置文件生成器"""
def __init__(self, config: HadoopConfig):
self.config = config
def _create_xml_element(self, name: str, value: str, description: str = "") -> ET.Element:
"""
创建XML配置元素
Args:
name: 配置项名称
value: 配置项值
description: 配置项描述
Returns:
ET.Element: XML元素
"""
property_elem = ET.Element("property")
name_elem = ET.SubElement(property_elem, "name")
name_elem.text = name
value_elem = ET.SubElement(property_elem, "value")
value_elem.text = str(value)
if description:
desc_elem = ET.SubElement(property_elem, "description")
desc_elem.text = description
return property_elem
def _format_xml(self, root: ET.Element) -> str:
"""
格式化XML输出
Args:
root: XML根元素
Returns:
str: 格式化的XML字符串
"""
# 添加XML声明和注释
xml_str = '<?xml version="1.0" encoding="UTF-8"?>\n'
xml_str += '<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>\n'
xml_str += '<!--\n'
xml_str += f' Hadoop Configuration for {self.config.cluster_name}\n'
xml_str += ' Generated by HadoopConfigGenerator\n'
xml_str += '-->\n'
# 转换为字符串并格式化
rough_string = ET.tostring(root, encoding='unicode')
# 简单的格式化(添加缩进)
lines = rough_string.split('>')
formatted_lines = []
indent_level = 0
for line in lines[:-1]: # 最后一个元素是空的
line = line + '>'
# 处理缩进
if line.strip().startswith('</'):
indent_level -= 1
formatted_lines.append(' ' * indent_level + line.strip())
if not line.strip().startswith('</') and not line.strip().endswith('/>'):
if '<' in line and not line.strip().startswith('<?'):
indent_level += 1
xml_str += '\n'.join(formatted_lines)
return xml_str
def generate_core_site(self) -> str:
"""
生成core-site.xml配置
Returns:
str: core-site.xml内容
"""
root = ET.Element("configuration")
# 核心配置项
properties = [
("fs.defaultFS", f"hdfs://{self.config.namenode_host}:{self.config.namenode_port}",
"默认文件系统URI"),
("hadoop.tmp.dir", self.config.hadoop_tmp_dir,
"Hadoop临时目录"),
("hadoop.http.staticuser.user", "hadoop",
"Web界面静态用户"),
("fs.trash.interval", "1440",
"回收站保留时间(分钟)"),
("io.file.buffer.size", "131072",
"文件缓冲区大小"),
("hadoop.security.authorization", "false",
"是否启用安全授权"),
("hadoop.security.authentication", "simple",
"认证方式"),
]
for name, value, desc in properties:
root.append(self._create_xml_element(name, value, desc))
return self._format_xml(root)
def generate_hdfs_site(self) -> str:
"""
生成hdfs-site.xml配置
Returns:
str: hdfs-site.xml内容
"""
root = ET.Element("configuration")
# HDFS配置项
properties = [
("dfs.namenode.name.dir", self.config.namenode_dir,
"NameNode数据目录"),
("dfs.datanode.data.dir", self.config.datanode_dir,
"DataNode数据目录"),
("dfs.namenode.checkpoint.dir", self.config.secondary_namenode_dir,
"Secondary NameNode检查点目录"),
("dfs.replication", str(self.config.replication_factor),
"数据副本数"),
("dfs.blocksize", "134217728",
"HDFS块大小(128MB)"),
("dfs.namenode.handler.count", "20",
"NameNode处理线程数"),
("dfs.datanode.handler.count", "10",
"DataNode处理线程数"),
("dfs.namenode.http-address", f"{self.config.namenode_host}:{self.config.namenode_http_port}",
"NameNode Web界面地址"),
("dfs.namenode.secondary.http-address", f"{self.config.secondary_namenode_host}:9868",
"Secondary NameNode Web界面地址"),
("dfs.webhdfs.enabled", "true",
"启用WebHDFS"),
("dfs.permissions.enabled", "false",
"禁用权限检查(开发环境)"),
("dfs.namenode.safemode.threshold-pct", "0.9",
"安全模式阈值"),
("dfs.datanode.max.transfer.threads", "4096",
"DataNode最大传输线程数"),
]
for name, value, desc in properties:
root.append(self._create_xml_element(name, value, desc))
return self._format_xml(root)
def generate_yarn_site(self) -> str:
"""
生成yarn-site.xml配置
Returns:
str: yarn-site.xml内容
"""
root = ET.Element("configuration")
# YARN配置项
properties = [
("yarn.resourcemanager.hostname", self.config.resource_manager_host,
"ResourceManager主机名"),
("yarn.resourcemanager.address", f"{self.config.resource_manager_host}:{self.config.resource_manager_port}",
"ResourceManager地址"),
("yarn.resourcemanager.webapp.address", f"{self.config.resource_manager_host}:{self.config.resource_manager_webapp_port}",
"ResourceManager Web界面地址"),
("yarn.nodemanager.aux-services", "mapreduce_shuffle",
"NodeManager辅助服务"),
("yarn.nodemanager.aux-services.mapreduce_shuffle.class",
"org.apache.hadoop.mapred.ShuffleHandler",
"Shuffle服务类"),
("yarn.nodemanager.local-dirs", self.config.yarn_local_dir,
"NodeManager本地目录"),
("yarn.nodemanager.log-dirs", self.config.yarn_log_dir,
"NodeManager日志目录"),
("yarn.nodemanager.resource.memory-mb", "4096",
"NodeManager可用内存(MB)"),
("yarn.nodemanager.resource.cpu-vcores", "4",
"NodeManager可用CPU核心数"),
("yarn.scheduler.maximum-allocation-mb", "4096",
"单个容器最大内存(MB)"),
("yarn.scheduler.minimum-allocation-mb", "512",
"单个容器最小内存(MB)"),
("yarn.app.mapreduce.am.resource.mb", "1024",
"ApplicationMaster内存(MB)"),
("yarn.log-aggregation-enable", "true",
"启用日志聚合"),
("yarn.log.server.url", f"http://{self.config.history_server_host}:{self.config.history_server_port}/jobhistory/logs",
"日志服务器URL"),
]
for name, value, desc in properties:
root.append(self._create_xml_element(name, value, desc))
return self._format_xml(root)
def generate_mapred_site(self) -> str:
"""
生成mapred-site.xml配置
Returns:
str: mapred-site.xml内容
"""
root = ET.Element("configuration")
# MapReduce配置项
properties = [
("mapreduce.framework.name", "yarn",
"MapReduce框架名称"),
("mapreduce.jobhistory.address", f"{self.config.history_server_host}:10020",
"JobHistory服务地址"),
("mapreduce.jobhistory.webapp.address", f"{self.config.history_server_host}:{self.config.history_server_port}",
"JobHistory Web界面地址"),
("mapreduce.map.memory.mb", "1024",
"Map任务内存(MB)"),
("mapreduce.reduce.memory.mb", "2048",
"Reduce任务内存(MB)"),
("mapreduce.map.java.opts", "-Xmx819m",
"Map任务JVM参数"),
("mapreduce.reduce.java.opts", "-Xmx1638m",
"Reduce任务JVM参数"),
("mapreduce.task.io.sort.mb", "256",
"排序缓冲区大小(MB)"),
("mapreduce.task.io.sort.factor", "100",
"排序合并因子"),
("mapreduce.reduce.shuffle.parallelcopies", "10",
"Shuffle并行复制数"),
]
for name, value, desc in properties:
root.append(self._create_xml_element(name, value, desc))
return self._format_xml(root)
def generate_hadoop_env(self) -> str:
"""
生成hadoop-env.sh配置
Returns:
str: hadoop-env.sh内容
"""
env_content = f'''#!/bin/bash
# Hadoop Environment Configuration
# Generated by HadoopConfigGenerator
# Java配置
export JAVA_HOME={os.environ.get('JAVA_HOME', '/usr/lib/jvm/java-8-openjdk-amd64')}
# Hadoop配置
export HADOOP_HOME={os.environ.get('HADOOP_HOME', '/opt/hadoop')}
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_LOG_DIR=/var/log/hadoop
# HDFS配置
export HDFS_NAMENODE_USER=hadoop
export HDFS_DATANODE_USER=hadoop
export HDFS_SECONDARYNAMENODE_USER=hadoop
# YARN配置
export YARN_RESOURCEMANAGER_USER=hadoop
export YARN_NODEMANAGER_USER=hadoop
# MapReduce配置
export MAPRED_HISTORYSERVER_USER=hadoop
# JVM堆大小配置
export HADOOP_NAMENODE_OPTS="-Xmx{self.config.namenode_heap_size}m -Dhadoop.security.logger=INFO,RFAS"
export HADOOP_DATANODE_OPTS="-Xmx{self.config.datanode_heap_size}m -Dhadoop.security.logger=ERROR,RFAS"
export YARN_RESOURCEMANAGER_OPTS="-Xmx{self.config.resource_manager_heap_size}m"
export YARN_NODEMANAGER_OPTS="-Xmx{self.config.node_manager_heap_size}m"
# 其他配置
export HADOOP_PID_DIR=/var/run/hadoop
export HADOOP_SECURE_DN_PID_DIR=/var/run/hadoop
export HADOOP_IDENT_STRING=hadoop
# 日志配置
export HADOOP_ROOT_LOGGER=INFO,console
export HADOOP_SECURITY_LOGGER=INFO,NullAppender
# 本地库路径
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/native"
'''
return env_content
def generate_workers(self) -> str:
"""
生成workers文件
Returns:
str: workers文件内容
"""
return '\n'.join(self.config.worker_nodes) + '\n'
def generate_all_configs(self, output_dir: str) -> Dict[str, str]:
"""
生成所有配置文件
Args:
output_dir: 输出目录
Returns:
Dict[str, str]: 配置文件路径映射
"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
configs = {
'core-site.xml': self.generate_core_site(),
'hdfs-site.xml': self.generate_hdfs_site(),
'yarn-site.xml': self.generate_yarn_site(),
'mapred-site.xml': self.generate_mapred_site(),
'hadoop-env.sh': self.generate_hadoop_env(),
'workers': self.generate_workers()
}
file_paths = {}
for filename, content in configs.items():
file_path = output_path / filename
with open(file_path, 'w', encoding='utf-8') as f:
f.write(content)
# 设置hadoop-env.sh为可执行
if filename == 'hadoop-env.sh':
os.chmod(file_path, 0o755)
file_paths[filename] = str(file_path)
print(f"✓ 生成配置文件: {file_path}")
return file_paths
def validate_config(self) -> Dict[str, Any]:
"""
验证配置参数
Returns:
Dict[str, Any]: 验证结果
"""
validation_result = {
'valid': True,
'errors': [],
'warnings': []
}
# 检查必需的主机名
if not self.config.namenode_host:
validation_result['valid'] = False
validation_result['errors'].append("NameNode主机名不能为空")
if not self.config.resource_manager_host:
validation_result['valid'] = False
validation_result['errors'].append("ResourceManager主机名不能为空")
# 检查端口范围
ports = [
('namenode_port', self.config.namenode_port),
('namenode_http_port', self.config.namenode_http_port),
('resource_manager_port', self.config.resource_manager_port),
('resource_manager_webapp_port', self.config.resource_manager_webapp_port),
('history_server_port', self.config.history_server_port)
]
for port_name, port_value in ports:
if not (1024 <= port_value <= 65535):
validation_result['valid'] = False
validation_result['errors'].append(f"{port_name} 端口 {port_value} 超出有效范围 (1024-65535)")
# 检查内存配置
memory_configs = [
('namenode_heap_size', self.config.namenode_heap_size),
('datanode_heap_size', self.config.datanode_heap_size),
('resource_manager_heap_size', self.config.resource_manager_heap_size),
('node_manager_heap_size', self.config.node_manager_heap_size)
]
for mem_name, mem_value in memory_configs:
if mem_value < 512:
validation_result['warnings'].append(f"{mem_name} 内存配置 {mem_value}MB 可能过小")
elif mem_value > 8192:
validation_result['warnings'].append(f"{mem_name} 内存配置 {mem_value}MB 可能过大")
# 检查副本数
if self.config.replication_factor < 1:
validation_result['valid'] = False
validation_result['errors'].append("副本数不能小于1")
elif self.config.replication_factor > len(self.config.worker_nodes):
validation_result['warnings'].append(f"副本数 {self.config.replication_factor} 大于Worker节点数 {len(self.config.worker_nodes)}")
# 检查Worker节点
if not self.config.worker_nodes:
validation_result['valid'] = False
validation_result['errors'].append("Worker节点列表不能为空")
return validation_result
# 使用示例
if __name__ == "__main__":
# 创建配置对象
config = HadoopConfig(
cluster_name="my-hadoop-cluster",
namenode_host="master1",
secondary_namenode_host="master2",
resource_manager_host="master1",
history_server_host="master1",
worker_nodes=["worker1", "worker2", "worker3"],
replication_factor=3
)
# 创建配置生成器
generator = HadoopConfigGenerator(config)
# 验证配置
validation = generator.validate_config()
print("=== 配置验证结果 ===")
print(f"配置有效: {validation['valid']}")
if validation['errors']:
print("错误:")
for error in validation['errors']:
print(f" - {error}")
if validation['warnings']:
print("警告:")
for warning in validation['warnings']:
print(f" - {warning}")
if validation['valid']:
# 生成配置文件
print("\n=== 生成配置文件 ===")
output_dir = "/tmp/hadoop-config"
file_paths = generator.generate_all_configs(output_dir)
print(f"\n配置文件已生成到: {output_dir}")
for filename, filepath in file_paths.items():
print(f" {filename}: {filepath}")
2.5.3 配置文件部署
#!/bin/bash
# Hadoop配置文件部署脚本
# 配置参数
HADOOP_USER="hadoop"
HADOOP_HOME="/opt/hadoop"
HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"
CONFIG_SOURCE_DIR="/tmp/hadoop-config"
NODES=("master1" "master2" "worker1" "worker2" "worker3")
# 备份现有配置
backup_configs() {
echo "备份现有配置文件..."
local backup_dir="$HADOOP_CONF_DIR/backup-$(date +%Y%m%d-%H%M%S)"
if [ -d "$HADOOP_CONF_DIR" ]; then
sudo -u "$HADOOP_USER" mkdir -p "$backup_dir"
sudo -u "$HADOOP_USER" cp -r "$HADOOP_CONF_DIR"/* "$backup_dir"/
echo "✓ 配置文件已备份到: $backup_dir"
else
echo "配置目录不存在,跳过备份"
fi
}
# 部署配置文件到本地
deploy_local_configs() {
echo "部署配置文件到本地..."
local config_files=("core-site.xml" "hdfs-site.xml" "yarn-site.xml" "mapred-site.xml" "hadoop-env.sh" "workers")
for config_file in "${config_files[@]}"; do
local source_file="$CONFIG_SOURCE_DIR/$config_file"
local target_file="$HADOOP_CONF_DIR/$config_file"
if [ -f "$source_file" ]; then
sudo -u "$HADOOP_USER" cp "$source_file" "$target_file"
# 设置权限
if [ "$config_file" = "hadoop-env.sh" ]; then
sudo chmod 755 "$target_file"
else
sudo chmod 644 "$target_file"
fi
echo "✓ 部署配置文件: $config_file"
else
echo "✗ 配置文件不存在: $source_file"
fi
done
}
# 分发配置文件到集群
distribute_configs() {
echo "分发配置文件到集群节点..."
local current_node=$(hostname)
for node in "${NODES[@]}"; do
if [ "$node" != "$current_node" ]; then
echo "正在分发配置到节点: $node"
# 创建远程配置目录
ssh "$HADOOP_USER@$node" "mkdir -p $HADOOP_CONF_DIR"
# 复制配置文件
scp -r "$HADOOP_CONF_DIR"/* "$HADOOP_USER@$node:$HADOOP_CONF_DIR/"
if [ $? -eq 0 ]; then
echo "✓ 配置文件已分发到: $node"
else
echo "✗ 配置文件分发失败: $node"
fi
fi
done
}
# 验证配置文件
validate_configs() {
echo "验证配置文件..."
local config_files=("core-site.xml" "hdfs-site.xml" "yarn-site.xml" "mapred-site.xml")
local validation_passed=true
for config_file in "${config_files[@]}"; do
local config_path="$HADOOP_CONF_DIR/$config_file"
if [ -f "$config_path" ]; then
# 检查XML格式
if xmllint --noout "$config_path" 2>/dev/null; then
echo "✓ $config_file 格式正确"
else
echo "✗ $config_file 格式错误"
validation_passed=false
fi
else
echo "✗ $config_file 文件不存在"
validation_passed=false
fi
done
# 检查hadoop-env.sh
if [ -f "$HADOOP_CONF_DIR/hadoop-env.sh" ]; then
if bash -n "$HADOOP_CONF_DIR/hadoop-env.sh" 2>/dev/null; then
echo "✓ hadoop-env.sh 语法正确"
else
echo "✗ hadoop-env.sh 语法错误"
validation_passed=false
fi
else
echo "✗ hadoop-env.sh 文件不存在"
validation_passed=false
fi
# 检查workers文件
if [ -f "$HADOOP_CONF_DIR/workers" ]; then
echo "✓ workers 文件存在"
echo "Worker节点列表:"
cat "$HADOOP_CONF_DIR/workers" | while read -r worker; do
echo " - $worker"
done
else
echo "✗ workers 文件不存在"
validation_passed=false
fi
if [ "$validation_passed" = true ]; then
echo "✓ 所有配置文件验证通过"
return 0
else
echo "✗ 配置文件验证失败"
return 1
fi
}
# 显示配置摘要
show_config_summary() {
echo "=== 配置摘要 ==="
# 从core-site.xml提取信息
if [ -f "$HADOOP_CONF_DIR/core-site.xml" ]; then
local fs_default=$(xmllint --xpath "//property[name='fs.defaultFS']/value/text()" "$HADOOP_CONF_DIR/core-site.xml" 2>/dev/null)
echo "默认文件系统: $fs_default"
fi
# 从hdfs-site.xml提取信息
if [ -f "$HADOOP_CONF_DIR/hdfs-site.xml" ]; then
local replication=$(xmllint --xpath "//property[name='dfs.replication']/value/text()" "$HADOOP_CONF_DIR/hdfs-site.xml" 2>/dev/null)
echo "副本数: $replication"
local namenode_http=$(xmllint --xpath "//property[name='dfs.namenode.http-address']/value/text()" "$HADOOP_CONF_DIR/hdfs-site.xml" 2>/dev/null)
echo "NameNode Web界面: http://$namenode_http"
fi
# 从yarn-site.xml提取信息
if [ -f "$HADOOP_CONF_DIR/yarn-site.xml" ]; then
local rm_webapp=$(xmllint --xpath "//property[name='yarn.resourcemanager.webapp.address']/value/text()" "$HADOOP_CONF_DIR/yarn-site.xml" 2>/dev/null)
echo "ResourceManager Web界面: http://$rm_webapp"
fi
# 从mapred-site.xml提取信息
if [ -f "$HADOOP_CONF_DIR/mapred-site.xml" ]; then
local history_webapp=$(xmllint --xpath "//property[name='mapreduce.jobhistory.webapp.address']/value/text()" "$HADOOP_CONF_DIR/mapred-site.xml" 2>/dev/null)
echo "JobHistory Web界面: http://$history_webapp"
fi
}
# 主函数
main() {
echo "=== 开始部署Hadoop配置文件 ==="
# 检查配置源目录
if [ ! -d "$CONFIG_SOURCE_DIR" ]; then
echo "错误: 配置源目录不存在: $CONFIG_SOURCE_DIR"
echo "请先使用配置生成工具生成配置文件"
exit 1
fi
# 检查是否以hadoop用户运行
if [ "$(whoami)" != "$HADOOP_USER" ]; then
echo "请以 $HADOOP_USER 用户运行此脚本"
exit 1
fi
# 执行部署步骤
backup_configs
deploy_local_configs
validate_configs || exit 1
distribute_configs
show_config_summary
echo "=== 配置文件部署完成 ==="
echo "请在所有节点上验证配置文件是否正确部署"
}
main
本节小结
本节详细介绍了Hadoop的安装与配置过程,包括:
- 环境准备:系统要求、集群规划、节点角色分配
- Java环境配置:JDK安装、环境变量设置、验证
- SSH免密登录配置:密钥生成分发、批量操作工具
- Hadoop下载与安装:下载、用户创建、安装、目录结构
- 基础配置文件:配置文件说明、生成工具、部署脚本
通过这些步骤和工具,可以快速完成Hadoop集群的基础安装和配置。下一节将介绍具体的集群部署模式和启动过程。
本节小结
本节详细介绍了Hadoop的安装和配置过程,包括:
- 环境准备:系统要求、集群规划、节点角色分配
- Java环境配置:JDK安装、环境变量设置
- SSH免密登录配置:密钥生成、分发、批量操作工具
- Hadoop下载与安装:下载、用户创建、安装、目录结构
通过这些步骤,可以完成Hadoop的基础安装。下一