1. 环境准备
1.1 系统要求
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Optional, Any
import subprocess
import os
import platform
class OperatingSystem(Enum):
"""操作系统枚举"""
LINUX = "linux"
WINDOWS = "windows"
MACOS = "macos"
class DeploymentMode(Enum):
"""部署模式枚举"""
STANDALONE = "standalone"
PSEUDO_DISTRIBUTED = "pseudo_distributed"
FULLY_DISTRIBUTED = "fully_distributed"
@dataclass
class SystemRequirement:
"""系统要求数据类"""
component: str
minimum_version: str
recommended_version: str
required: bool
description: str
@dataclass
class HardwareRequirement:
"""硬件要求数据类"""
cpu_cores: int
memory_gb: int
disk_space_gb: int
network_bandwidth: str
deployment_mode: DeploymentMode
class EnvironmentChecker:
"""环境检查类"""
def __init__(self):
self.system_requirements = self._initialize_system_requirements()
self.hardware_requirements = self._initialize_hardware_requirements()
def _initialize_system_requirements(self) -> List[SystemRequirement]:
"""初始化系统要求"""
return [
SystemRequirement(
component="Java JDK",
minimum_version="8",
recommended_version="11",
required=True,
description="HBase运行需要Java环境"
),
SystemRequirement(
component="Hadoop",
minimum_version="2.7.0",
recommended_version="3.3.0",
required=True,
description="HBase依赖Hadoop HDFS和部分库"
),
SystemRequirement(
component="ZooKeeper",
minimum_version="3.4.0",
recommended_version="3.7.0",
required=True,
description="集群协调服务"
),
SystemRequirement(
component="SSH",
minimum_version="Any",
recommended_version="OpenSSH 7.0+",
required=True,
description="集群节点间通信"
)
]
def _initialize_hardware_requirements(self) -> List[HardwareRequirement]:
"""初始化硬件要求"""
return [
HardwareRequirement(
cpu_cores=2,
memory_gb=4,
disk_space_gb=20,
network_bandwidth="100Mbps",
deployment_mode=DeploymentMode.STANDALONE
),
HardwareRequirement(
cpu_cores=4,
memory_gb=8,
disk_space_gb=100,
network_bandwidth="1Gbps",
deployment_mode=DeploymentMode.PSEUDO_DISTRIBUTED
),
HardwareRequirement(
cpu_cores=8,
memory_gb=16,
disk_space_gb=500,
network_bandwidth="10Gbps",
deployment_mode=DeploymentMode.FULLY_DISTRIBUTED
)
]
def check_java_version(self) -> Dict[str, Any]:
"""检查Java版本"""
try:
result = subprocess.run(['java', '-version'],
capture_output=True, text=True, stderr=subprocess.STDOUT)
output = result.stderr if result.stderr else result.stdout
# 解析Java版本
version_line = output.split('\n')[0]
if 'version' in version_line:
version_str = version_line.split('"')[1]
major_version = version_str.split('.')[0]
if major_version == '1':
major_version = version_str.split('.')[1]
return {
"installed": True,
"version": version_str,
"major_version": int(major_version),
"meets_requirement": int(major_version) >= 8,
"output": output
}
except Exception as e:
return {
"installed": False,
"error": str(e),
"meets_requirement": False
}
def check_system_resources(self) -> Dict[str, Any]:
"""检查系统资源"""
import psutil
cpu_count = psutil.cpu_count()
memory_gb = psutil.virtual_memory().total / (1024**3)
disk_space_gb = psutil.disk_usage('/').total / (1024**3)
return {
"cpu_cores": cpu_count,
"memory_gb": round(memory_gb, 2),
"disk_space_gb": round(disk_space_gb, 2),
"operating_system": platform.system().lower()
}
def recommend_deployment_mode(self, resources: Dict[str, Any]) -> Dict[str, Any]:
"""推荐部署模式"""
cpu_cores = resources.get('cpu_cores', 0)
memory_gb = resources.get('memory_gb', 0)
disk_space_gb = resources.get('disk_space_gb', 0)
if cpu_cores >= 8 and memory_gb >= 16 and disk_space_gb >= 500:
recommended_mode = DeploymentMode.FULLY_DISTRIBUTED
suitability = "excellent"
elif cpu_cores >= 4 and memory_gb >= 8 and disk_space_gb >= 100:
recommended_mode = DeploymentMode.PSEUDO_DISTRIBUTED
suitability = "good"
elif cpu_cores >= 2 and memory_gb >= 4 and disk_space_gb >= 20:
recommended_mode = DeploymentMode.STANDALONE
suitability = "basic"
else:
recommended_mode = None
suitability = "insufficient"
return {
"recommended_mode": recommended_mode.value if recommended_mode else None,
"suitability": suitability,
"requirements_met": recommended_mode is not None,
"suggestions": self._get_upgrade_suggestions(resources)
}
def _get_upgrade_suggestions(self, resources: Dict[str, Any]) -> List[str]:
"""获取升级建议"""
suggestions = []
if resources.get('cpu_cores', 0) < 4:
suggestions.append("建议升级到至少4核CPU")
if resources.get('memory_gb', 0) < 8:
suggestions.append("建议增加内存到至少8GB")
if resources.get('disk_space_gb', 0) < 100:
suggestions.append("建议增加磁盘空间到至少100GB")
return suggestions
# 使用示例
env_checker = EnvironmentChecker()
# 检查Java版本
java_check = env_checker.check_java_version()
print(f"Java检查结果: {java_check}")
# 检查系统资源
resources = env_checker.check_system_resources()
print(f"系统资源: {resources}")
# 推荐部署模式
recommendation = env_checker.recommend_deployment_mode(resources)
print(f"部署模式推荐: {recommendation}")
1.2 网络配置
class NetworkProtocol(Enum):
"""网络协议枚举"""
TCP = "tcp"
HTTP = "http"
HTTPS = "https"
class ServiceType(Enum):
"""服务类型枚举"""
HMASTER = "hmaster"
REGIONSERVER = "regionserver"
ZOOKEEPER = "zookeeper"
HDFS_NAMENODE = "hdfs_namenode"
HDFS_DATANODE = "hdfs_datanode"
@dataclass
class NetworkPort:
"""网络端口数据类"""
service: ServiceType
port: int
protocol: NetworkProtocol
description: str
required: bool = True
@dataclass
class NetworkConfiguration:
"""网络配置数据类"""
hostname: str
ip_address: str
ports: List[NetworkPort]
firewall_rules: List[str]
class NetworkConfigurator:
"""网络配置器类"""
def __init__(self):
self.default_ports = self._initialize_default_ports()
self.firewall_commands = self._initialize_firewall_commands()
def _initialize_default_ports(self) -> List[NetworkPort]:
"""初始化默认端口"""
return [
NetworkPort(
service=ServiceType.HMASTER,
port=16000,
protocol=NetworkProtocol.TCP,
description="HMaster RPC端口"
),
NetworkPort(
service=ServiceType.HMASTER,
port=16010,
protocol=NetworkProtocol.HTTP,
description="HMaster Web UI端口"
),
NetworkPort(
service=ServiceType.REGIONSERVER,
port=16020,
protocol=NetworkProtocol.TCP,
description="RegionServer RPC端口"
),
NetworkPort(
service=ServiceType.REGIONSERVER,
port=16030,
protocol=NetworkProtocol.HTTP,
description="RegionServer Web UI端口"
),
NetworkPort(
service=ServiceType.ZOOKEEPER,
port=2181,
protocol=NetworkProtocol.TCP,
description="ZooKeeper客户端端口"
),
NetworkPort(
service=ServiceType.ZOOKEEPER,
port=2888,
protocol=NetworkProtocol.TCP,
description="ZooKeeper Follower端口"
),
NetworkPort(
service=ServiceType.ZOOKEEPER,
port=3888,
protocol=NetworkProtocol.TCP,
description="ZooKeeper Leader选举端口"
)
]
def _initialize_firewall_commands(self) -> Dict[str, List[str]]:
"""初始化防火墙命令"""
return {
"ubuntu": [
"sudo ufw allow 16000/tcp", # HMaster RPC
"sudo ufw allow 16010/tcp", # HMaster Web UI
"sudo ufw allow 16020/tcp", # RegionServer RPC
"sudo ufw allow 16030/tcp", # RegionServer Web UI
"sudo ufw allow 2181/tcp", # ZooKeeper
"sudo ufw allow 2888/tcp", # ZooKeeper Follower
"sudo ufw allow 3888/tcp", # ZooKeeper Leader
"sudo ufw allow 9000/tcp", # HDFS NameNode
"sudo ufw allow 9864/tcp", # HDFS DataNode
"sudo ufw reload"
],
"centos": [
"sudo firewall-cmd --permanent --add-port=16000/tcp",
"sudo firewall-cmd --permanent --add-port=16010/tcp",
"sudo firewall-cmd --permanent --add-port=16020/tcp",
"sudo firewall-cmd --permanent --add-port=16030/tcp",
"sudo firewall-cmd --permanent --add-port=2181/tcp",
"sudo firewall-cmd --permanent --add-port=2888/tcp",
"sudo firewall-cmd --permanent --add-port=3888/tcp",
"sudo firewall-cmd --permanent --add-port=9000/tcp",
"sudo firewall-cmd --permanent --add-port=9864/tcp",
"sudo firewall-cmd --reload"
]
}
def generate_hosts_file(self, cluster_nodes: List[Dict[str, str]]) -> str:
"""生成hosts文件内容"""
hosts_content = "# HBase Cluster Hosts\n"
for node in cluster_nodes:
hosts_content += f"{node['ip']}\t{node['hostname']}\n"
return hosts_content
def generate_ssh_config(self, cluster_nodes: List[Dict[str, str]],
ssh_user: str = "hadoop") -> Dict[str, str]:
"""生成SSH配置"""
ssh_commands = [
"# 生成SSH密钥对",
"ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa",
"",
"# 将公钥添加到authorized_keys",
"cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys",
"chmod 0600 ~/.ssh/authorized_keys",
"",
"# 复制公钥到其他节点"
]
for node in cluster_nodes:
ssh_commands.append(f"ssh-copy-id {ssh_user}@{node['hostname']}")
return {
"setup_commands": ssh_commands,
"test_commands": [f"ssh {node['hostname']} 'hostname'" for node in cluster_nodes]
}
def check_network_connectivity(self, target_hosts: List[str],
ports: List[int]) -> Dict[str, Dict[str, bool]]:
"""检查网络连通性"""
import socket
connectivity_results = {}
for host in target_hosts:
connectivity_results[host] = {}
for port in ports:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
result = sock.connect_ex((host, port))
connectivity_results[host][str(port)] = result == 0
sock.close()
except Exception:
connectivity_results[host][str(port)] = False
return connectivity_results
# 使用示例
network_config = NetworkConfigurator()
# 集群节点配置
cluster_nodes = [
{"hostname": "hbase-master", "ip": "192.168.1.10"},
{"hostname": "hbase-region1", "ip": "192.168.1.11"},
{"hostname": "hbase-region2", "ip": "192.168.1.12"}
]
# 生成hosts文件
hosts_content = network_config.generate_hosts_file(cluster_nodes)
print(f"Hosts文件内容:\n{hosts_content}")
# 生成SSH配置
ssh_config = network_config.generate_ssh_config(cluster_nodes)
print(f"SSH配置命令: {ssh_config['setup_commands'][:5]}")
# 检查网络连通性
test_ports = [16000, 16020, 2181]
connectivity = network_config.check_network_connectivity(
["localhost"], test_ports
)
print(f"网络连通性: {connectivity}")
2. 单机模式安装
2.1 下载和安装
import urllib.request
import tarfile
import shutil
from pathlib import Path
class InstallationMode(Enum):
"""安装模式枚举"""
BINARY = "binary"
SOURCE = "source"
DOCKER = "docker"
class HBaseVersion(Enum):
"""HBase版本枚举"""
VERSION_2_4_17 = "2.4.17"
VERSION_2_5_5 = "2.5.5"
VERSION_3_0_0 = "3.0.0-alpha-4"
@dataclass
class DownloadInfo:
"""下载信息数据类"""
version: HBaseVersion
download_url: str
checksum_url: str
file_size_mb: int
release_date: str
class HBaseInstaller:
"""HBase安装器类"""
def __init__(self, install_dir: str = "/opt/hbase"):
self.install_dir = Path(install_dir)
self.download_info = self._initialize_download_info()
self.installation_steps = self._initialize_installation_steps()
def _initialize_download_info(self) -> Dict[HBaseVersion, DownloadInfo]:
"""初始化下载信息"""
base_url = "https://archive.apache.org/dist/hbase"
return {
HBaseVersion.VERSION_2_4_17: DownloadInfo(
version=HBaseVersion.VERSION_2_4_17,
download_url=f"{base_url}/2.4.17/hbase-2.4.17-bin.tar.gz",
checksum_url=f"{base_url}/2.4.17/hbase-2.4.17-bin.tar.gz.sha512",
file_size_mb=138,
release_date="2023-05-10"
),
HBaseVersion.VERSION_2_5_5: DownloadInfo(
version=HBaseVersion.VERSION_2_5_5,
download_url=f"{base_url}/2.5.5/hbase-2.5.5-bin.tar.gz",
checksum_url=f"{base_url}/2.5.5/hbase-2.5.5-bin.tar.gz.sha512",
file_size_mb=142,
release_date="2023-07-15"
)
}
def _initialize_installation_steps(self) -> List[str]:
"""初始化安装步骤"""
return [
"下载HBase二进制包",
"验证文件完整性",
"解压安装包",
"设置环境变量",
"配置HBase",
"启动HBase服务",
"验证安装"
]
def download_hbase(self, version: HBaseVersion,
download_dir: str = "/tmp") -> Dict[str, Any]:
"""下载HBase"""
download_info = self.download_info.get(version)
if not download_info:
return {"success": False, "error": "不支持的版本"}
download_path = Path(download_dir)
download_path.mkdir(parents=True, exist_ok=True)
filename = f"hbase-{version.value}-bin.tar.gz"
file_path = download_path / filename
try:
# 模拟下载过程
print(f"开始下载 {download_info.download_url}")
print(f"文件大小: {download_info.file_size_mb}MB")
# 实际环境中使用以下代码下载
# urllib.request.urlretrieve(download_info.download_url, file_path)
return {
"success": True,
"file_path": str(file_path),
"version": version.value,
"size_mb": download_info.file_size_mb
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def extract_hbase(self, tar_file_path: str) -> Dict[str, Any]:
"""解压HBase"""
try:
tar_path = Path(tar_file_path)
if not tar_path.exists():
return {"success": False, "error": "文件不存在"}
# 创建安装目录
self.install_dir.mkdir(parents=True, exist_ok=True)
# 解压文件
with tarfile.open(tar_path, 'r:gz') as tar:
tar.extractall(self.install_dir.parent)
# 重命名目录
extracted_dir = self.install_dir.parent / tar_path.stem.replace('-bin', '')
if extracted_dir.exists():
if self.install_dir.exists():
shutil.rmtree(self.install_dir)
extracted_dir.rename(self.install_dir)
return {
"success": True,
"install_dir": str(self.install_dir),
"extracted_files": list(self.install_dir.iterdir())
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def setup_environment(self) -> Dict[str, str]:
"""设置环境变量"""
env_vars = {
"HBASE_HOME": str(self.install_dir),
"HBASE_CONF_DIR": str(self.install_dir / "conf"),
"PATH": f"$PATH:{self.install_dir}/bin"
}
# 生成环境变量脚本
env_script = "#!/bin/bash\n"
env_script += "# HBase Environment Variables\n"
for key, value in env_vars.items():
env_script += f"export {key}={value}\n"
return {
"environment_variables": env_vars,
"script_content": env_script,
"script_path": "/etc/profile.d/hbase.sh"
}
def generate_installation_script(self, version: HBaseVersion) -> str:
"""生成安装脚本"""
download_info = self.download_info.get(version)
script = f"""#!/bin/bash
# HBase {version.value} 自动安装脚本
set -e
echo "开始安装HBase {version.value}..."
# 1. 创建安装目录
sudo mkdir -p {self.install_dir.parent}
sudo chown $USER:$USER {self.install_dir.parent}
# 2. 下载HBase
echo "下载HBase二进制包..."
cd /tmp
wget {download_info.download_url if download_info else 'URL_NOT_FOUND'}
# 3. 验证下载文件
echo "验证文件完整性..."
wget {download_info.checksum_url if download_info else 'CHECKSUM_URL_NOT_FOUND'}
sha512sum -c hbase-{version.value}-bin.tar.gz.sha512
# 4. 解压安装
echo "解压安装包..."
tar -xzf hbase-{version.value}-bin.tar.gz -C {self.install_dir.parent}
mv {self.install_dir.parent}/hbase-{version.value} {self.install_dir}
# 5. 设置环境变量
echo "设置环境变量..."
sudo tee /etc/profile.d/hbase.sh > /dev/null <<EOF
export HBASE_HOME={self.install_dir}
export HBASE_CONF_DIR={self.install_dir}/conf
export PATH=\$PATH:{self.install_dir}/bin
EOF
# 6. 加载环境变量
source /etc/profile.d/hbase.sh
# 7. 设置权限
chmod +x {self.install_dir}/bin/*
echo "HBase {version.value} 安装完成!"
echo "安装目录: {self.install_dir}"
echo "请运行 'source /etc/profile.d/hbase.sh' 加载环境变量"
"""
return script
# 使用示例
installer = HBaseInstaller("/opt/hbase")
# 下载HBase
download_result = installer.download_hbase(HBaseVersion.VERSION_2_5_5)
print(f"下载结果: {download_result}")
# 设置环境变量
env_setup = installer.setup_environment()
print(f"环境变量设置: {env_setup['environment_variables']}")
# 生成安装脚本
install_script = installer.generate_installation_script(HBaseVersion.VERSION_2_5_5)
print(f"安装脚本长度: {len(install_script)} 字符")
2.2 基本配置
class ConfigurationType(Enum):
"""配置类型枚举"""
HBASE_SITE = "hbase-site"
HBASE_ENV = "hbase-env"
REGIONSERVERS = "regionservers"
BACKUP_MASTERS = "backup-masters"
@dataclass
class ConfigurationProperty:
"""配置属性数据类"""
name: str
value: str
description: str
required: bool = True
default_value: Optional[str] = None
class HBaseConfigurator:
"""HBase配置器类"""
def __init__(self, hbase_home: str):
self.hbase_home = Path(hbase_home)
self.conf_dir = self.hbase_home / "conf"
self.configurations = self._initialize_configurations()
def _initialize_configurations(self) -> Dict[ConfigurationType, List[ConfigurationProperty]]:
"""初始化配置"""
return {
ConfigurationType.HBASE_SITE: [
ConfigurationProperty(
name="hbase.rootdir",
value="hdfs://localhost:9000/hbase",
description="HBase根目录,存储在HDFS中"
),
ConfigurationProperty(
name="hbase.zookeeper.quorum",
value="localhost",
description="ZooKeeper集群地址"
),
ConfigurationProperty(
name="hbase.zookeeper.property.dataDir",
value="/tmp/zookeeper",
description="ZooKeeper数据目录"
),
ConfigurationProperty(
name="hbase.cluster.distributed",
value="false",
description="是否为分布式模式"
),
ConfigurationProperty(
name="hbase.tmp.dir",
value="/tmp/hbase-${user.name}",
description="HBase临时目录"
),
ConfigurationProperty(
name="hbase.unsafe.stream.capability.enforce",
value="false",
description="禁用流能力检查(开发环境)",
required=False
)
],
ConfigurationType.HBASE_ENV: [
ConfigurationProperty(
name="JAVA_HOME",
value="/usr/lib/jvm/java-11-openjdk",
description="Java安装路径"
),
ConfigurationProperty(
name="HBASE_HEAPSIZE",
value="1G",
description="HBase堆内存大小"
),
ConfigurationProperty(
name="HBASE_MANAGES_ZK",
value="true",
description="HBase是否管理ZooKeeper"
)
]
}
def generate_hbase_site_xml(self, mode: DeploymentMode = DeploymentMode.STANDALONE) -> str:
"""生成hbase-site.xml配置"""
properties = self.configurations[ConfigurationType.HBASE_SITE].copy()
# 根据部署模式调整配置
if mode == DeploymentMode.STANDALONE:
# 单机模式使用本地文件系统
for prop in properties:
if prop.name == "hbase.rootdir":
prop.value = "file:///tmp/hbase"
elif prop.name == "hbase.cluster.distributed":
prop.value = "false"
xml_content = '<?xml version="1.0"?>\n'
xml_content += '<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>\n'
xml_content += '<configuration>\n'
for prop in properties:
xml_content += ' <property>\n'
xml_content += f' <name>{prop.name}</name>\n'
xml_content += f' <value>{prop.value}</value>\n'
xml_content += f' <description>{prop.description}</description>\n'
xml_content += ' </property>\n'
xml_content += '</configuration>\n'
return xml_content
def generate_hbase_env_sh(self) -> str:
"""生成hbase-env.sh配置"""
env_properties = self.configurations[ConfigurationType.HBASE_ENV]
script_content = "#!/usr/bin/env bash\n"
script_content += "# HBase Environment Configuration\n\n"
for prop in env_properties:
script_content += f"# {prop.description}\n"
script_content += f"export {prop.name}={prop.value}\n\n"
# 添加其他重要配置
script_content += "# HBase进程的PID目录\n"
script_content += "export HBASE_PID_DIR=/tmp\n\n"
script_content += "# HBase日志目录\n"
script_content += "export HBASE_LOG_DIR=${HBASE_HOME}/logs\n\n"
script_content += "# HBase安全配置\n"
script_content += "export HBASE_OPTS=\"$HBASE_OPTS -XX:+UseConcMarkSweepGC\"\n"
return script_content
def write_configuration_files(self, mode: DeploymentMode = DeploymentMode.STANDALONE) -> Dict[str, str]:
"""写入配置文件"""
results = {}
try:
# 确保配置目录存在
self.conf_dir.mkdir(parents=True, exist_ok=True)
# 写入hbase-site.xml
hbase_site_content = self.generate_hbase_site_xml(mode)
hbase_site_path = self.conf_dir / "hbase-site.xml"
with open(hbase_site_path, 'w') as f:
f.write(hbase_site_content)
results["hbase-site.xml"] = str(hbase_site_path)
# 写入hbase-env.sh
hbase_env_content = self.generate_hbase_env_sh()
hbase_env_path = self.conf_dir / "hbase-env.sh"
with open(hbase_env_path, 'w') as f:
f.write(hbase_env_content)
# 设置执行权限
hbase_env_path.chmod(0o755)
results["hbase-env.sh"] = str(hbase_env_path)
return results
except Exception as e:
return {"error": str(e)}
def validate_configuration(self) -> Dict[str, Any]:
"""验证配置"""
validation_results = {
"valid": True,
"errors": [],
"warnings": [],
"suggestions": []
}
# 检查必需的配置文件
required_files = ["hbase-site.xml", "hbase-env.sh"]
for file_name in required_files:
file_path = self.conf_dir / file_name
if not file_path.exists():
validation_results["errors"].append(f"缺少配置文件: {file_name}")
validation_results["valid"] = False
# 检查Java环境
java_home = os.environ.get('JAVA_HOME')
if not java_home:
validation_results["warnings"].append("未设置JAVA_HOME环境变量")
# 检查目录权限
if not os.access(self.hbase_home, os.R_OK | os.W_OK):
validation_results["errors"].append(f"HBase目录权限不足: {self.hbase_home}")
validation_results["valid"] = False
# 性能建议
validation_results["suggestions"].extend([
"建议设置合适的堆内存大小(HBASE_HEAPSIZE)",
"建议配置SSD存储以提高性能",
"建议设置合适的GC参数"
])
return validation_results
# 使用示例
configurator = HBaseConfigurator("/opt/hbase")
# 生成配置文件
config_files = configurator.write_configuration_files(DeploymentMode.STANDALONE)
print(f"配置文件: {config_files}")
# 验证配置
validation = configurator.validate_configuration()
print(f"配置验证: {validation}")
# 查看hbase-site.xml内容
hbase_site_xml = configurator.generate_hbase_site_xml()
print(f"hbase-site.xml内容长度: {len(hbase_site_xml)} 字符")
3. 启动和验证
3.1 服务启动
class ServiceStatus(Enum):
"""服务状态枚举"""
STOPPED = "stopped"
STARTING = "starting"
RUNNING = "running"
STOPPING = "stopping"
ERROR = "error"
class HBaseService(Enum):
"""HBase服务枚举"""
MASTER = "master"
REGIONSERVER = "regionserver"
ZOOKEEPER = "zookeeper"
THRIFT = "thrift"
REST = "rest"
@dataclass
class ServiceInfo:
"""服务信息数据类"""
name: HBaseService
status: ServiceStatus
pid: Optional[int]
port: int
log_file: str
start_time: Optional[datetime] = None
class HBaseServiceManager:
"""HBase服务管理器类"""
def __init__(self, hbase_home: str):
self.hbase_home = Path(hbase_home)
self.bin_dir = self.hbase_home / "bin"
self.logs_dir = self.hbase_home / "logs"
self.services = self._initialize_services()
def _initialize_services(self) -> Dict[HBaseService, ServiceInfo]:
"""初始化服务信息"""
return {
HBaseService.MASTER: ServiceInfo(
name=HBaseService.MASTER,
status=ServiceStatus.STOPPED,
pid=None,
port=16010,
log_file=str(self.logs_dir / "hbase-master.log")
),
HBaseService.REGIONSERVER: ServiceInfo(
name=HBaseService.REGIONSERVER,
status=ServiceStatus.STOPPED,
pid=None,
port=16030,
log_file=str(self.logs_dir / "hbase-regionserver.log")
),
HBaseService.ZOOKEEPER: ServiceInfo(
name=HBaseService.ZOOKEEPER,
status=ServiceStatus.STOPPED,
pid=None,
port=2181,
log_file=str(self.logs_dir / "hbase-zookeeper.log")
)
}
def start_service(self, service: HBaseService) -> Dict[str, Any]:
"""启动服务"""
service_info = self.services.get(service)
if not service_info:
return {"success": False, "error": "未知服务"}
if service_info.status == ServiceStatus.RUNNING:
return {"success": False, "error": "服务已在运行"}
try:
# 更新服务状态
service_info.status = ServiceStatus.STARTING
service_info.start_time = datetime.now()
# 构建启动命令
if service == HBaseService.MASTER:
command = [str(self.bin_dir / "hbase"), "master", "start"]
elif service == HBaseService.REGIONSERVER:
command = [str(self.bin_dir / "hbase"), "regionserver", "start"]
elif service == HBaseService.ZOOKEEPER:
command = [str(self.bin_dir / "hbase"), "zookeeper", "start"]
else:
return {"success": False, "error": "不支持的服务类型"}
# 模拟启动过程
print(f"启动命令: {' '.join(command)}")
# 实际环境中使用subprocess启动服务
# process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# service_info.pid = process.pid
# 模拟PID
import random
service_info.pid = random.randint(1000, 9999)
service_info.status = ServiceStatus.RUNNING
return {
"success": True,
"service": service.value,
"pid": service_info.pid,
"port": service_info.port,
"log_file": service_info.log_file
}
except Exception as e:
service_info.status = ServiceStatus.ERROR
return {
"success": False,
"error": str(e)
}
def stop_service(self, service: HBaseService) -> Dict[str, Any]:
"""停止服务"""
service_info = self.services.get(service)
if not service_info:
return {"success": False, "error": "未知服务"}
if service_info.status != ServiceStatus.RUNNING:
return {"success": False, "error": "服务未在运行"}
try:
service_info.status = ServiceStatus.STOPPING
# 构建停止命令
if service == HBaseService.MASTER:
command = [str(self.bin_dir / "hbase"), "master", "stop"]
elif service == HBaseService.REGIONSERVER:
command = [str(self.bin_dir / "hbase"), "regionserver", "stop"]
elif service == HBaseService.ZOOKEEPER:
command = [str(self.bin_dir / "hbase"), "zookeeper", "stop"]
print(f"停止命令: {' '.join(command)}")
# 更新服务状态
service_info.status = ServiceStatus.STOPPED
service_info.pid = None
return {
"success": True,
"service": service.value,
"message": "服务已停止"
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def start_all_services(self) -> Dict[str, Any]:
"""启动所有服务"""
results = {}
# 按顺序启动服务
startup_order = [HBaseService.ZOOKEEPER, HBaseService.MASTER, HBaseService.REGIONSERVER]
for service in startup_order:
result = self.start_service(service)
results[service.value] = result
if not result["success"]:
results["overall_success"] = False
results["failed_service"] = service.value
return results
# 等待服务启动
import time
time.sleep(2)
results["overall_success"] = True
return results
def stop_all_services(self) -> Dict[str, Any]:
"""停止所有服务"""
results = {}
# 按逆序停止服务
shutdown_order = [HBaseService.REGIONSERVER, HBaseService.MASTER, HBaseService.ZOOKEEPER]
for service in shutdown_order:
result = self.stop_service(service)
results[service.value] = result
return results
def get_service_status(self, service: HBaseService) -> Dict[str, Any]:
"""获取服务状态"""
service_info = self.services.get(service)
if not service_info:
return {"error": "未知服务"}
return {
"service": service.value,
"status": service_info.status.value,
"pid": service_info.pid,
"port": service_info.port,
"log_file": service_info.log_file,
"start_time": service_info.start_time.isoformat() if service_info.start_time else None
}
def get_all_services_status(self) -> Dict[str, Any]:
"""获取所有服务状态"""
status = {}
for service in HBaseService:
status[service.value] = self.get_service_status(service)
# 计算整体状态
running_count = sum(1 for s in self.services.values() if s.status == ServiceStatus.RUNNING)
total_count = len(self.services)
status["summary"] = {
"running_services": running_count,
"total_services": total_count,
"cluster_status": "healthy" if running_count == total_count else "degraded"
}
return status
def generate_startup_script(self) -> str:
"""生成启动脚本"""
script = f"""#!/bin/bash
# HBase集群启动脚本
set -e
HBASE_HOME={self.hbase_home}
echo "启动HBase集群..."
# 检查环境
if [ ! -d "$HBASE_HOME" ]; then
echo "错误: HBase目录不存在: $HBASE_HOME"
exit 1
fi
# 启动ZooKeeper
echo "启动ZooKeeper..."
$HBASE_HOME/bin/hbase zookeeper start &
sleep 5
# 启动HMaster
echo "启动HMaster..."
$HBASE_HOME/bin/hbase master start &
sleep 10
# 启动RegionServer
echo "启动RegionServer..."
$HBASE_HOME/bin/hbase regionserver start &
sleep 5
echo "HBase集群启动完成!"
echo "Web UI地址:"
echo " HMaster: http://localhost:16010"
echo " RegionServer: http://localhost:16030"
# 检查服务状态
echo "检查服务状态..."
jps | grep -E "HMaster|HRegionServer|HQuorumPeer"
"""
return script
# 使用示例
service_manager = HBaseServiceManager("/opt/hbase")
# 启动所有服务
startup_result = service_manager.start_all_services()
print(f"启动结果: {startup_result}")
# 获取服务状态
status = service_manager.get_all_services_status()
print(f"服务状态: {status}")
# 生成启动脚本
startup_script = service_manager.generate_startup_script()
print(f"启动脚本长度: {len(startup_script)} 字符")
3.2 安装验证
class VerificationTest(Enum):
"""验证测试枚举"""
SERVICE_STATUS = "service_status"
WEB_UI_ACCESS = "web_ui_access"
SHELL_ACCESS = "shell_access"
TABLE_OPERATIONS = "table_operations"
DATA_OPERATIONS = "data_operations"
@dataclass
class TestResult:
"""测试结果数据类"""
test_name: VerificationTest
success: bool
message: str
details: Optional[Dict[str, Any]] = None
execution_time: Optional[float] = None
class HBaseVerifier:
"""HBase验证器类"""
def __init__(self, hbase_home: str):
self.hbase_home = Path(hbase_home)
self.shell_cmd = str(self.hbase_home / "bin" / "hbase")
self.test_results = []
def verify_service_status(self) -> TestResult:
"""验证服务状态"""
start_time = time.time()
try:
# 检查进程是否运行
import psutil
hbase_processes = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
if proc.info['cmdline'] and any('hbase' in arg.lower() for arg in proc.info['cmdline']):
hbase_processes.append({
'pid': proc.info['pid'],
'name': proc.info['name'],
'cmdline': ' '.join(proc.info['cmdline'])
})
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
execution_time = time.time() - start_time
if hbase_processes:
return TestResult(
test_name=VerificationTest.SERVICE_STATUS,
success=True,
message=f"发现 {len(hbase_processes)} 个HBase进程",
details={"processes": hbase_processes},
execution_time=execution_time
)
else:
return TestResult(
test_name=VerificationTest.SERVICE_STATUS,
success=False,
message="未发现HBase进程",
execution_time=execution_time
)
except Exception as e:
return TestResult(
test_name=VerificationTest.SERVICE_STATUS,
success=False,
message=f"检查服务状态失败: {str(e)}",
execution_time=time.time() - start_time
)
def verify_web_ui_access(self) -> TestResult:
"""验证Web UI访问"""
start_time = time.time()
try:
import urllib.request
import urllib.error
web_uis = {
"HMaster": "http://localhost:16010",
"RegionServer": "http://localhost:16030"
}
access_results = {}
for ui_name, url in web_uis.items():
try:
response = urllib.request.urlopen(url, timeout=5)
access_results[ui_name] = {
"accessible": True,
"status_code": response.getcode(),
"url": url
}
except urllib.error.URLError:
access_results[ui_name] = {
"accessible": False,
"url": url
}
execution_time = time.time() - start_time
accessible_count = sum(1 for result in access_results.values() if result["accessible"])
return TestResult(
test_name=VerificationTest.WEB_UI_ACCESS,
success=accessible_count > 0,
message=f"{accessible_count}/{len(web_uis)} 个Web UI可访问",
details=access_results,
execution_time=execution_time
)
except Exception as e:
return TestResult(
test_name=VerificationTest.WEB_UI_ACCESS,
success=False,
message=f"Web UI访问测试失败: {str(e)}",
execution_time=time.time() - start_time
)
def verify_shell_access(self) -> TestResult:
"""验证Shell访问"""
start_time = time.time()
try:
# 测试HBase shell命令
test_command = [self.shell_cmd, "version"]
# 模拟命令执行
# result = subprocess.run(test_command, capture_output=True, text=True, timeout=30)
# 模拟成功结果
mock_output = "HBase 2.5.5\nSource code repository git://github.com/apache/hbase\nRevision=1234567\nCompiled by user on Mon Jan 01 00:00:00 UTC 2024"
execution_time = time.time() - start_time
return TestResult(
test_name=VerificationTest.SHELL_ACCESS,
success=True,
message="HBase Shell访问正常",
details={"version_output": mock_output},
execution_time=execution_time
)
except Exception as e:
return TestResult(
test_name=VerificationTest.SHELL_ACCESS,
success=False,
message=f"Shell访问测试失败: {str(e)}",
execution_time=time.time() - start_time
)
def verify_table_operations(self) -> TestResult:
"""验证表操作"""
start_time = time.time()
try:
# 模拟表操作测试
operations = [
"创建测试表",
"列出表",
"描述表结构",
"禁用表",
"删除表"
]
# 模拟成功执行所有操作
execution_time = time.time() - start_time
return TestResult(
test_name=VerificationTest.TABLE_OPERATIONS,
success=True,
message="表操作测试通过",
details={"operations_tested": operations},
execution_time=execution_time
)
except Exception as e:
return TestResult(
test_name=VerificationTest.TABLE_OPERATIONS,
success=False,
message=f"表操作测试失败: {str(e)}",
execution_time=time.time() - start_time
)
def verify_data_operations(self) -> TestResult:
"""验证数据操作"""
start_time = time.time()
try:
# 模拟数据操作测试
operations = [
"插入数据",
"查询数据",
"扫描表",
"更新数据",
"删除数据"
]
# 模拟成功执行所有操作
execution_time = time.time() - start_time
return TestResult(
test_name=VerificationTest.DATA_OPERATIONS,
success=True,
message="数据操作测试通过",
details={"operations_tested": operations},
execution_time=execution_time
)
except Exception as e:
return TestResult(
test_name=VerificationTest.DATA_OPERATIONS,
success=False,
message=f"数据操作测试失败: {str(e)}",
execution_time=time.time() - start_time
)
def run_all_verifications(self) -> Dict[str, Any]:
"""运行所有验证测试"""
print("开始HBase安装验证...")
# 执行所有测试
tests = [
self.verify_service_status,
self.verify_web_ui_access,
self.verify_shell_access,
self.verify_table_operations,
self.verify_data_operations
]
results = []
for test_func in tests:
print(f"执行测试: {test_func.__name__}")
result = test_func()
results.append(result)
print(f" 结果: {'通过' if result.success else '失败'} - {result.message}")
# 统计结果
passed_tests = sum(1 for r in results if r.success)
total_tests = len(results)
total_time = sum(r.execution_time for r in results if r.execution_time)
overall_success = passed_tests == total_tests
return {
"overall_success": overall_success,
"passed_tests": passed_tests,
"total_tests": total_tests,
"success_rate": passed_tests / total_tests,
"total_execution_time": total_time,
"test_results": [{
"test": r.test_name.value,
"success": r.success,
"message": r.message,
"execution_time": r.execution_time
} for r in results],
"recommendations": self._generate_recommendations(results)
}
def _generate_recommendations(self, results: List[TestResult]) -> List[str]:
"""生成建议"""
recommendations = []
for result in results:
if not result.success:
if result.test_name == VerificationTest.SERVICE_STATUS:
recommendations.append("检查HBase服务是否正确启动")
elif result.test_name == VerificationTest.WEB_UI_ACCESS:
recommendations.append("检查防火墙设置和端口配置")
elif result.test_name == VerificationTest.SHELL_ACCESS:
recommendations.append("检查HBase环境变量和权限设置")
elif result.test_name == VerificationTest.TABLE_OPERATIONS:
recommendations.append("检查HBase配置和HDFS连接")
elif result.test_name == VerificationTest.DATA_OPERATIONS:
recommendations.append("检查RegionServer状态和存储配置")
if not recommendations:
recommendations.append("HBase安装验证全部通过,可以开始使用")
return recommendations
def generate_verification_report(self) -> str:
"""生成验证报告"""
verification_results = self.run_all_verifications()
report = "# HBase安装验证报告\n\n"
report += f"**验证时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}**\n\n"
# 总体结果
status_icon = "✅" if verification_results["overall_success"] else "❌"
report += f"## 总体结果 {status_icon}\n\n"
report += f"- **通过测试**: {verification_results['passed_tests']}/{verification_results['total_tests']}\n"
report += f"- **成功率**: {verification_results['success_rate']:.1%}\n"
report += f"- **总执行时间**: {verification_results['total_execution_time']:.2f}秒\n\n"
# 详细测试结果
report += "## 详细测试结果\n\n"
for test_result in verification_results["test_results"]:
icon = "✅" if test_result["success"] else "❌"
report += f"### {icon} {test_result['test']}\n"
report += f"- **状态**: {'通过' if test_result['success'] else '失败'}\n"
report += f"- **消息**: {test_result['message']}\n"
report += f"- **执行时间**: {test_result['execution_time']:.2f}秒\n\n"
# 建议
report += "## 建议和后续步骤\n\n"
for i, recommendation in enumerate(verification_results["recommendations"], 1):
report += f"{i}. {recommendation}\n"
return report
# 使用示例
verifier = HBaseVerifier("/opt/hbase")
# 运行验证
verification_results = verifier.run_all_verifications()
print(f"验证结果: {verification_results['overall_success']}")
print(f"通过率: {verification_results['success_rate']:.1%}")
# 生成报告
report = verifier.generate_verification_report()
print(f"验证报告长度: {len(report)} 字符")
4. 故障排除
4.1 常见问题
class IssueCategory(Enum):
"""问题分类枚举"""
INSTALLATION = "installation"
CONFIGURATION = "configuration"
STARTUP = "startup"
CONNECTIVITY = "connectivity"
PERFORMANCE = "performance"
PERMISSIONS = "permissions"
class IssueSeverity(Enum):
"""问题严重程度枚举"""
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
@dataclass
class TroubleshootingIssue:
"""故障排除问题数据类"""
title: str
category: IssueCategory
severity: IssueSeverity
symptoms: List[str]
possible_causes: List[str]
solutions: List[str]
prevention: List[str]
class HBaseTroubleshooter:
"""HBase故障排除器类"""
def __init__(self):
self.known_issues = self._initialize_known_issues()
self.diagnostic_commands = self._initialize_diagnostic_commands()
def _initialize_known_issues(self) -> List[TroubleshootingIssue]:
"""初始化已知问题"""
return [
TroubleshootingIssue(
title="HBase服务无法启动",
category=IssueCategory.STARTUP,
severity=IssueSeverity.CRITICAL,
symptoms=[
"执行start-hbase.sh后没有进程启动",
"日志中显示端口被占用",
"ZooKeeper连接失败"
],
possible_causes=[
"端口被其他进程占用",
"Java环境配置错误",
"HDFS未启动或配置错误",
"权限不足",
"配置文件错误"
],
solutions=[
"检查并释放被占用的端口",
"验证JAVA_HOME环境变量",
"确保HDFS服务正常运行",
"检查文件和目录权限",
"验证hbase-site.xml配置"
],
prevention=[
"使用标准端口配置",
"设置正确的环境变量",
"定期检查依赖服务状态"
]
),
TroubleshootingIssue(
title="RegionServer频繁重启",
category=IssueCategory.STARTUP,
severity=IssueSeverity.HIGH,
symptoms=[
"RegionServer进程反复启动和停止",
"日志中出现OutOfMemoryError",
"客户端连接超时"
],
possible_causes=[
"内存配置不足",
"GC配置不当",
"磁盘空间不足",
"网络连接问题"
],
solutions=[
"增加堆内存大小",
"优化GC参数",
"清理磁盘空间",
"检查网络连接"
],
prevention=[
"合理规划内存配置",
"监控磁盘使用率",
"定期检查网络状态"
]
),
TroubleshootingIssue(
title="无法连接到HBase",
category=IssueCategory.CONNECTIVITY,
severity=IssueSeverity.HIGH,
symptoms=[
"客户端连接超时",
"Web UI无法访问",
"Shell命令执行失败"
],
possible_causes=[
"防火墙阻止连接",
"网络配置错误",
"服务未正常启动",
"DNS解析问题"
],
solutions=[
"配置防火墙规则",
"检查网络配置",
"重启HBase服务",
"配置hosts文件"
],
prevention=[
"正确配置网络环境",
"定期测试连接性",
"监控服务状态"
]
)
]
def _initialize_diagnostic_commands(self) -> Dict[str, List[str]]:
"""初始化诊断命令"""
return {
"进程检查": [
"jps | grep -E 'HMaster|HRegionServer|HQuorumPeer'",
"ps aux | grep hbase",
"netstat -tlnp | grep -E '16000|16010|16020|16030|2181'"
],
"日志检查": [
"tail -f $HBASE_HOME/logs/hbase-master-*.log",
"tail -f $HBASE_HOME/logs/hbase-regionserver-*.log",
"tail -f $HBASE_HOME/logs/hbase-zookeeper-*.log"
],
"配置检查": [
"cat $HBASE_HOME/conf/hbase-site.xml",
"cat $HBASE_HOME/conf/hbase-env.sh",
"echo $JAVA_HOME",
"echo $HBASE_HOME"
],
"连接测试": [
"telnet localhost 16000",
"curl http://localhost:16010",
"hbase shell <<< 'status'"
]
}
def diagnose_issue(self, symptoms: List[str]) -> List[TroubleshootingIssue]:
"""根据症状诊断问题"""
matching_issues = []
for issue in self.known_issues:
# 计算症状匹配度
matches = sum(1 for symptom in symptoms
if any(s.lower() in symptom.lower() for s in issue.symptoms))
if matches > 0:
matching_issues.append((issue, matches))
# 按匹配度排序
matching_issues.sort(key=lambda x: x[1], reverse=True)
return [issue for issue, _ in matching_issues]
def get_diagnostic_commands(self, category: IssueCategory) -> List[str]:
"""获取诊断命令"""
command_mapping = {
IssueCategory.STARTUP: ["进程检查", "日志检查", "配置检查"],
IssueCategory.CONNECTIVITY: ["进程检查", "连接测试"],
IssueCategory.CONFIGURATION: ["配置检查"],
IssueCategory.PERFORMANCE: ["进程检查", "日志检查"]
}
commands = []
for cmd_category in command_mapping.get(category, []):
commands.extend(self.diagnostic_commands.get(cmd_category, []))
return commands
def generate_troubleshooting_guide(self, issue: TroubleshootingIssue) -> str:
"""生成故障排除指南"""
guide = f"# {issue.title} 故障排除指南\n\n"
# 问题信息
guide += f"**分类**: {issue.category.value}\n"
guide += f"**严重程度**: {issue.severity.value}\n\n"
# 症状
guide += "## 症状\n\n"
for symptom in issue.symptoms:
guide += f"- {symptom}\n"
guide += "\n"
# 可能原因
guide += "## 可能原因\n\n"
for cause in issue.possible_causes:
guide += f"- {cause}\n"
guide += "\n"
# 解决方案
guide += "## 解决方案\n\n"
for i, solution in enumerate(issue.solutions, 1):
guide += f"{i}. {solution}\n"
guide += "\n"
# 诊断命令
commands = self.get_diagnostic_commands(issue.category)
if commands:
guide += "## 诊断命令\n\n"
guide += "```bash\n"
for cmd in commands[:5]: # 限制命令数量
guide += f"{cmd}\n"
guide += "```\n\n"
# 预防措施
guide += "## 预防措施\n\n"
for prevention in issue.prevention:
guide += f"- {prevention}\n"
return guide
# 使用示例
troubleshooter = HBaseTroubleshooter()
# 诊断问题
symptoms = ["服务无法启动", "端口被占用"]
matching_issues = troubleshooter.diagnose_issue(symptoms)
print(f"匹配的问题: {len(matching_issues)} 个")
if matching_issues:
# 生成故障排除指南
guide = troubleshooter.generate_troubleshooting_guide(matching_issues[0])
print(f"故障排除指南长度: {len(guide)} 字符")
# 获取诊断命令
diag_commands = troubleshooter.get_diagnostic_commands(IssueCategory.STARTUP)
print(f"启动问题诊断命令: {diag_commands[:3]}")
5. 总结
本章详细介绍了HBase的安装部署和环境配置,包括:
关键要点
环境准备:
- 系统要求检查(Java、Hadoop、ZooKeeper)
- 硬件资源评估
- 网络配置和防火墙设置
单机模式安装:
- 下载和解压HBase
- 环境变量配置
- 基本配置文件设置
服务管理:
- 启动和停止服务
- 服务状态监控
- 自动化脚本生成
安装验证:
- 服务状态检查
- Web UI访问测试
- 基本功能验证
故障排除:
- 常见问题识别
- 诊断命令使用
- 解决方案实施
最佳实践
- 在生产环境中使用分布式模式
- 合理配置内存和GC参数
- 定期监控服务状态和性能
- 建立完善的日志管理机制
- 制定故障应急预案
下一步学习
在下一章中,我们将深入学习HBase的数据模型和表设计,了解如何设计高效的HBase表结构。