1. HBase Shell操作
1.1 Shell基础命令
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Union
from datetime import datetime
import json
import re
import time
class ShellCommandType(Enum):
"""Shell命令类型枚举"""
DDL = "ddl" # 数据定义语言
DML = "dml" # 数据操作语言
ADMIN = "admin" # 管理命令
UTILITY = "utility" # 工具命令
class CommandStatus(Enum):
"""命令执行状态枚举"""
SUCCESS = "success"
FAILED = "failed"
PARTIAL = "partial"
TIMEOUT = "timeout"
@dataclass
class ShellCommand:
"""Shell命令数据类"""
command: str
command_type: ShellCommandType
description: str
syntax: str
examples: List[str]
parameters: Dict[str, str]
class HBaseShellSimulator:
"""HBase Shell模拟器类"""
def __init__(self):
self.tables: Dict[str, Dict[str, Any]] = {}
self.command_history: List[Dict[str, Any]] = []
self.current_namespace = "default"
self.shell_commands = self._initialize_commands()
def _initialize_commands(self) -> Dict[str, ShellCommand]:
"""初始化Shell命令"""
commands = {}
# DDL命令
commands["create"] = ShellCommand(
command="create",
command_type=ShellCommandType.DDL,
description="创建表",
syntax="create 'table_name', 'column_family1', 'column_family2', ...",
examples=[
"create 'users', 'info', 'stats'",
"create 'logs', {NAME => 'data', VERSIONS => 3}"
],
parameters={
"table_name": "表名",
"column_family": "列族名称",
"VERSIONS": "最大版本数",
"TTL": "生存时间(秒)",
"COMPRESSION": "压缩算法"
}
)
commands["drop"] = ShellCommand(
command="drop",
command_type=ShellCommandType.DDL,
description="删除表",
syntax="drop 'table_name'",
examples=["drop 'users'"],
parameters={"table_name": "要删除的表名"}
)
commands["describe"] = ShellCommand(
command="describe",
command_type=ShellCommandType.DDL,
description="描述表结构",
syntax="describe 'table_name'",
examples=["describe 'users'"],
parameters={"table_name": "要描述的表名"}
)
# DML命令
commands["put"] = ShellCommand(
command="put",
command_type=ShellCommandType.DML,
description="插入或更新数据",
syntax="put 'table_name', 'row_key', 'column_family:qualifier', 'value'",
examples=[
"put 'users', 'user001', 'info:name', 'Zhang San'",
"put 'users', 'user001', 'info:age', '25'"
],
parameters={
"table_name": "表名",
"row_key": "行键",
"column": "列名(列族:限定符)",
"value": "值",
"timestamp": "时间戳(可选)"
}
)
commands["get"] = ShellCommand(
command="get",
command_type=ShellCommandType.DML,
description="获取数据",
syntax="get 'table_name', 'row_key'",
examples=[
"get 'users', 'user001'",
"get 'users', 'user001', 'info:name'",
"get 'users', 'user001', {COLUMN => 'info:name', VERSIONS => 3}"
],
parameters={
"table_name": "表名",
"row_key": "行键",
"COLUMN": "指定列",
"VERSIONS": "版本数",
"TIMERANGE": "时间范围"
}
)
commands["scan"] = ShellCommand(
command="scan",
command_type=ShellCommandType.DML,
description="扫描表数据",
syntax="scan 'table_name'",
examples=[
"scan 'users'",
"scan 'users', {STARTROW => 'user001', ENDROW => 'user999'}",
"scan 'users', {COLUMNS => 'info:name', LIMIT => 10}"
],
parameters={
"table_name": "表名",
"STARTROW": "起始行键",
"ENDROW": "结束行键",
"COLUMNS": "指定列",
"LIMIT": "限制行数",
"FILTER": "过滤器"
}
)
commands["delete"] = ShellCommand(
command="delete",
command_type=ShellCommandType.DML,
description="删除数据",
syntax="delete 'table_name', 'row_key', 'column_family:qualifier'",
examples=[
"delete 'users', 'user001', 'info:age'",
"delete 'users', 'user001', 'info:age', timestamp"
],
parameters={
"table_name": "表名",
"row_key": "行键",
"column": "列名",
"timestamp": "时间戳(可选)"
}
)
# 管理命令
commands["list"] = ShellCommand(
command="list",
command_type=ShellCommandType.ADMIN,
description="列出所有表",
syntax="list",
examples=["list", "list 'user.*'"],
parameters={"pattern": "表名模式(可选)"}
)
commands["status"] = ShellCommand(
command="status",
command_type=ShellCommandType.ADMIN,
description="显示集群状态",
syntax="status",
examples=["status", "status 'simple'", "status 'detailed'"],
parameters={"format": "显示格式(simple/detailed)"}
)
commands["version"] = ShellCommand(
command="version",
command_type=ShellCommandType.ADMIN,
description="显示HBase版本",
syntax="version",
examples=["version"],
parameters={}
)
return commands
def execute_command(self, command_line: str) -> Dict[str, Any]:
"""执行Shell命令"""
start_time = time.time()
try:
# 解析命令
parsed = self._parse_command(command_line)
command_name = parsed["command"]
args = parsed["args"]
# 记录命令历史
history_entry = {
"timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"command": command_line,
"status": CommandStatus.SUCCESS.value
}
# 执行命令
if command_name == "create":
result = self._execute_create(args)
elif command_name == "drop":
result = self._execute_drop(args)
elif command_name == "describe":
result = self._execute_describe(args)
elif command_name == "put":
result = self._execute_put(args)
elif command_name == "get":
result = self._execute_get(args)
elif command_name == "scan":
result = self._execute_scan(args)
elif command_name == "delete":
result = self._execute_delete(args)
elif command_name == "list":
result = self._execute_list(args)
elif command_name == "status":
result = self._execute_status(args)
elif command_name == "version":
result = self._execute_version(args)
elif command_name == "help":
result = self._execute_help(args)
else:
result = {
"success": False,
"message": f"未知命令: {command_name}",
"suggestion": "使用 'help' 查看可用命令"
}
history_entry["status"] = CommandStatus.FAILED.value
# 计算执行时间
execution_time = time.time() - start_time
result["execution_time"] = round(execution_time * 1000, 2) # 毫秒
history_entry["result"] = result
self.command_history.append(history_entry)
return result
except Exception as e:
error_result = {
"success": False,
"message": f"命令执行错误: {str(e)}",
"execution_time": round((time.time() - start_time) * 1000, 2)
}
history_entry["status"] = CommandStatus.FAILED.value
history_entry["result"] = error_result
self.command_history.append(history_entry)
return error_result
def _parse_command(self, command_line: str) -> Dict[str, Any]:
"""解析命令行"""
# 简化的命令解析
parts = command_line.strip().split()
if not parts:
raise ValueError("空命令")
command = parts[0].lower()
args = parts[1:] if len(parts) > 1 else []
return {"command": command, "args": args}
def _execute_create(self, args: List[str]) -> Dict[str, Any]:
"""执行create命令"""
if len(args) < 2:
return {
"success": False,
"message": "create命令需要表名和至少一个列族",
"syntax": self.shell_commands["create"].syntax
}
table_name = args[0].strip("'\"")
column_families = [cf.strip("'\") for cf in args[1:]]
# 创建表
table_info = {
"name": table_name,
"column_families": {},
"data": {},
"created_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
for cf in column_families:
table_info["column_families"][cf] = {
"max_versions": 1,
"ttl": None,
"compression": "NONE"
}
self.tables[table_name] = table_info
return {
"success": True,
"message": f"表 '{table_name}' 创建成功",
"table_name": table_name,
"column_families": column_families
}
def _execute_drop(self, args: List[str]) -> Dict[str, Any]:
"""执行drop命令"""
if len(args) < 1:
return {
"success": False,
"message": "drop命令需要表名",
"syntax": self.shell_commands["drop"].syntax
}
table_name = args[0].strip("'\"")
if table_name not in self.tables:
return {
"success": False,
"message": f"表 '{table_name}' 不存在"
}
del self.tables[table_name]
return {
"success": True,
"message": f"表 '{table_name}' 删除成功",
"table_name": table_name
}
def _execute_describe(self, args: List[str]) -> Dict[str, Any]:
"""执行describe命令"""
if len(args) < 1:
return {
"success": False,
"message": "describe命令需要表名",
"syntax": self.shell_commands["describe"].syntax
}
table_name = args[0].strip("'\"")
if table_name not in self.tables:
return {
"success": False,
"message": f"表 '{table_name}' 不存在"
}
table_info = self.tables[table_name]
return {
"success": True,
"message": f"表 '{table_name}' 结构信息",
"table_name": table_name,
"column_families": table_info["column_families"],
"created_time": table_info["created_time"]
}
def _execute_put(self, args: List[str]) -> Dict[str, Any]:
"""执行put命令"""
if len(args) < 4:
return {
"success": False,
"message": "put命令需要表名、行键、列名和值",
"syntax": self.shell_commands["put"].syntax
}
table_name = args[0].strip("'\"")
row_key = args[1].strip("'\"")
column = args[2].strip("'\"")
value = args[3].strip("'\"")
if table_name not in self.tables:
return {
"success": False,
"message": f"表 '{table_name}' 不存在"
}
# 解析列名
if ':' not in column:
return {
"success": False,
"message": "列名格式错误,应为 'column_family:qualifier'"
}
cf, qualifier = column.split(':', 1)
# 检查列族是否存在
if cf not in self.tables[table_name]["column_families"]:
return {
"success": False,
"message": f"列族 '{cf}' 不存在"
}
# 插入数据
if row_key not in self.tables[table_name]["data"]:
self.tables[table_name]["data"][row_key] = {}
self.tables[table_name]["data"][row_key][column] = {
"value": value,
"timestamp": int(datetime.now().timestamp() * 1000)
}
return {
"success": True,
"message": f"数据插入成功",
"table_name": table_name,
"row_key": row_key,
"column": column,
"value": value
}
def _execute_get(self, args: List[str]) -> Dict[str, Any]:
"""执行get命令"""
if len(args) < 2:
return {
"success": False,
"message": "get命令需要表名和行键",
"syntax": self.shell_commands["get"].syntax
}
table_name = args[0].strip("'\"")
row_key = args[1].strip("'\"")
column_filter = args[2].strip("'\") if len(args) > 2 else None
if table_name not in self.tables:
return {
"success": False,
"message": f"表 '{table_name}' 不存在"
}
if row_key not in self.tables[table_name]["data"]:
return {
"success": True,
"message": f"行 '{row_key}' 不存在",
"data": {}
}
row_data = self.tables[table_name]["data"][row_key]
if column_filter:
# 过滤特定列
filtered_data = {k: v for k, v in row_data.items() if k == column_filter}
else:
# 返回所有列
filtered_data = row_data
return {
"success": True,
"message": f"获取数据成功",
"table_name": table_name,
"row_key": row_key,
"data": filtered_data
}
def _execute_scan(self, args: List[str]) -> Dict[str, Any]:
"""执行scan命令"""
if len(args) < 1:
return {
"success": False,
"message": "scan命令需要表名",
"syntax": self.shell_commands["scan"].syntax
}
table_name = args[0].strip("'\"")
if table_name not in self.tables:
return {
"success": False,
"message": f"表 '{table_name}' 不存在"
}
table_data = self.tables[table_name]["data"]
# 构造扫描结果
results = []
for row_key, row_data in sorted(table_data.items()):
results.append({
"row_key": row_key,
"data": row_data
})
return {
"success": True,
"message": f"扫描完成",
"table_name": table_name,
"results": results,
"count": len(results)
}
def _execute_delete(self, args: List[str]) -> Dict[str, Any]:
"""执行delete命令"""
if len(args) < 3:
return {
"success": False,
"message": "delete命令需要表名、行键和列名",
"syntax": self.shell_commands["delete"].syntax
}
table_name = args[0].strip("'\"")
row_key = args[1].strip("'\"")
column = args[2].strip("'\"")
if table_name not in self.tables:
return {
"success": False,
"message": f"表 '{table_name}' 不存在"
}
if row_key not in self.tables[table_name]["data"]:
return {
"success": False,
"message": f"行 '{row_key}' 不存在"
}
if column not in self.tables[table_name]["data"][row_key]:
return {
"success": False,
"message": f"列 '{column}' 不存在"
}
# 删除列
deleted_value = self.tables[table_name]["data"][row_key].pop(column)
# 如果行为空,删除行
if not self.tables[table_name]["data"][row_key]:
del self.tables[table_name]["data"][row_key]
return {
"success": True,
"message": f"删除成功",
"table_name": table_name,
"row_key": row_key,
"column": column,
"deleted_value": deleted_value
}
def _execute_list(self, args: List[str]) -> Dict[str, Any]:
"""执行list命令"""
pattern = args[0].strip("'\") if args else None
table_names = list(self.tables.keys())
if pattern:
# 简单的模式匹配
import re
regex_pattern = pattern.replace('*', '.*')
table_names = [name for name in table_names if re.match(regex_pattern, name)]
return {
"success": True,
"message": f"找到 {len(table_names)} 个表",
"tables": table_names
}
def _execute_status(self, args: List[str]) -> Dict[str, Any]:
"""执行status命令"""
format_type = args[0] if args else "simple"
status_info = {
"cluster_id": "hbase-cluster-001",
"master": "localhost:16000",
"version": "2.4.9",
"region_servers": 1,
"tables": len(self.tables),
"regions": sum(len(table["data"]) for table in self.tables.values()),
"uptime": "2 days, 5 hours, 30 minutes"
}
if format_type == "detailed":
status_info["detailed_info"] = {
"heap_memory": "512MB / 2GB",
"region_server_list": ["localhost:16020"],
"table_details": [
{"name": name, "regions": len(table["data"])}
for name, table in self.tables.items()
]
}
return {
"success": True,
"message": "集群状态",
"status": status_info
}
def _execute_version(self, args: List[str]) -> Dict[str, Any]:
"""执行version命令"""
return {
"success": True,
"message": "HBase版本信息",
"version": "2.4.9",
"revision": "abc123def456",
"compiled": "2023-12-01 10:30:00",
"java_version": "1.8.0_301"
}
def _execute_help(self, args: List[str]) -> Dict[str, Any]:
"""执行help命令"""
if args:
command_name = args[0].lower()
if command_name in self.shell_commands:
cmd = self.shell_commands[command_name]
return {
"success": True,
"message": f"命令 '{command_name}' 帮助信息",
"command": cmd.command,
"description": cmd.description,
"syntax": cmd.syntax,
"examples": cmd.examples,
"parameters": cmd.parameters
}
else:
return {
"success": False,
"message": f"未知命令: {command_name}"
}
# 显示所有命令
commands_by_type = {}
for cmd in self.shell_commands.values():
cmd_type = cmd.command_type.value
if cmd_type not in commands_by_type:
commands_by_type[cmd_type] = []
commands_by_type[cmd_type].append({
"command": cmd.command,
"description": cmd.description
})
return {
"success": True,
"message": "HBase Shell 帮助信息",
"commands": commands_by_type
}
def get_command_history(self, limit: int = 10) -> List[Dict[str, Any]]:
"""获取命令历史"""
return self.command_history[-limit:]
def clear_history(self) -> bool:
"""清空命令历史"""
self.command_history.clear()
return True
# 使用示例
shell = HBaseShellSimulator()
print("=== HBase Shell 模拟器 ===")
# 执行一系列命令
commands = [
"version",
"status",
"create 'users', 'info', 'stats'",
"list",
"describe 'users'",
"put 'users', 'user001', 'info:name', 'Zhang San'",
"put 'users', 'user001', 'info:age', '25'",
"put 'users', 'user001', 'stats:login_count', '100'",
"get 'users', 'user001'",
"get 'users', 'user001', 'info:name'",
"scan 'users'",
"delete 'users', 'user001', 'info:age'",
"get 'users', 'user001'"
]
for cmd in commands:
print(f"\n> {cmd}")
result = shell.execute_command(cmd)
if result["success"]:
print(f"✅ {result['message']}")
if "data" in result:
print(f" 数据: {result['data']}")
if "tables" in result:
print(f" 表列表: {result['tables']}")
if "results" in result:
print(f" 扫描结果: {len(result['results'])} 行")
else:
print(f"❌ {result['message']}")
print(f" 执行时间: {result['execution_time']}ms")
# 显示命令历史
print("\n=== 命令历史 ===")
history = shell.get_command_history(5)
for i, entry in enumerate(history, 1):
status_icon = "✅" if entry["status"] == "success" else "❌"
print(f"{i}. {status_icon} [{entry['timestamp']}] {entry['command']}")
2. Java API操作
2.1 连接管理
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Callable
from datetime import datetime
import json
import threading
import time
class ConnectionState(Enum):
"""连接状态枚举"""
DISCONNECTED = "disconnected"
CONNECTING = "connecting"
CONNECTED = "connected"
RECONNECTING = "reconnecting"
FAILED = "failed"
class OperationType(Enum):
"""操作类型枚举"""
GET = "get"
PUT = "put"
DELETE = "delete"
SCAN = "scan"
BATCH = "batch"
@dataclass
class ConnectionConfig:
"""连接配置数据类"""
zookeeper_quorum: str = "localhost:2181"
zookeeper_znode_parent: str = "/hbase"
client_timeout: int = 30000 # 30秒
rpc_timeout: int = 60000 # 60秒
retries: int = 3
retry_pause: int = 1000 # 1秒
max_connections: int = 100
connection_pool_size: int = 10
@dataclass
class OperationResult:
"""操作结果数据类"""
success: bool
operation_type: OperationType
table_name: str
row_key: Optional[str] = None
data: Optional[Dict[str, Any]] = None
error_message: Optional[str] = None
execution_time: float = 0.0
timestamp: str = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
class HBaseConnection:
"""HBase连接类"""
def __init__(self, config: ConnectionConfig):
self.config = config
self.state = ConnectionState.DISCONNECTED
self.connection_time: Optional[datetime] = None
self.last_activity: Optional[datetime] = None
self.operation_count = 0
self.error_count = 0
self._lock = threading.Lock()
def connect(self) -> bool:
"""建立连接"""
with self._lock:
try:
self.state = ConnectionState.CONNECTING
# 模拟连接过程
time.sleep(0.1) # 模拟连接延迟
# 验证配置
if not self.config.zookeeper_quorum:
raise ValueError("ZooKeeper地址不能为空")
self.state = ConnectionState.CONNECTED
self.connection_time = datetime.now()
self.last_activity = datetime.now()
print(f"连接到HBase成功: {self.config.zookeeper_quorum}")
return True
except Exception as e:
self.state = ConnectionState.FAILED
print(f"连接失败: {e}")
return False
def disconnect(self) -> bool:
"""断开连接"""
with self._lock:
try:
if self.state == ConnectionState.CONNECTED:
self.state = ConnectionState.DISCONNECTED
print("连接已断开")
return True
except Exception as e:
print(f"断开连接失败: {e}")
return False
def is_connected(self) -> bool:
"""检查连接状态"""
return self.state == ConnectionState.CONNECTED
def get_connection_info(self) -> Dict[str, Any]:
"""获取连接信息"""
uptime = None
if self.connection_time:
uptime = (datetime.now() - self.connection_time).total_seconds()
return {
"state": self.state.value,
"zookeeper_quorum": self.config.zookeeper_quorum,
"connection_time": self.connection_time.strftime('%Y-%m-%d %H:%M:%S') if self.connection_time else None,
"uptime_seconds": uptime,
"operation_count": self.operation_count,
"error_count": self.error_count,
"last_activity": self.last_activity.strftime('%Y-%m-%d %H:%M:%S') if self.last_activity else None
}
def _update_activity(self):
"""更新活动时间"""
self.last_activity = datetime.now()
self.operation_count += 1
class HBaseTable:
"""HBase表操作类"""
def __init__(self, connection: HBaseConnection, table_name: str):
self.connection = connection
self.table_name = table_name
self.operation_history: List[OperationResult] = []
self._data_store: Dict[str, Dict[str, Any]] = {} # 模拟数据存储
def put(self, row_key: str, column_family: str, qualifier: str,
value: Any, timestamp: Optional[int] = None) -> OperationResult:
"""插入数据"""
start_time = time.time()
try:
if not self.connection.is_connected():
raise Exception("连接未建立")
# 构造列名
column_name = f"{column_family}:{qualifier}"
# 初始化行数据
if row_key not in self._data_store:
self._data_store[row_key] = {}
# 插入数据
cell_data = {
"value": value,
"timestamp": timestamp or int(datetime.now().timestamp() * 1000)
}
self._data_store[row_key][column_name] = cell_data
self.connection._update_activity()
result = OperationResult(
success=True,
operation_type=OperationType.PUT,
table_name=self.table_name,
row_key=row_key,
data={column_name: cell_data},
execution_time=round((time.time() - start_time) * 1000, 2)
)
self.operation_history.append(result)
return result
except Exception as e:
self.connection.error_count += 1
result = OperationResult(
success=False,
operation_type=OperationType.PUT,
table_name=self.table_name,
row_key=row_key,
error_message=str(e),
execution_time=round((time.time() - start_time) * 1000, 2)
)
self.operation_history.append(result)
return result
def get(self, row_key: str, column_family: Optional[str] = None,
qualifier: Optional[str] = None) -> OperationResult:
"""获取数据"""
start_time = time.time()
try:
if not self.connection.is_connected():
raise Exception("连接未建立")
if row_key not in self._data_store:
result_data = {}
else:
row_data = self._data_store[row_key]
if column_family and qualifier:
# 获取特定列
column_name = f"{column_family}:{qualifier}"
result_data = {column_name: row_data.get(column_name)} if column_name in row_data else {}
elif column_family:
# 获取列族下所有列
result_data = {k: v for k, v in row_data.items() if k.startswith(f"{column_family}:")}
else:
# 获取整行
result_data = row_data.copy()
self.connection._update_activity()
result = OperationResult(
success=True,
operation_type=OperationType.GET,
table_name=self.table_name,
row_key=row_key,
data=result_data,
execution_time=round((time.time() - start_time) * 1000, 2)
)
self.operation_history.append(result)
return result
except Exception as e:
self.connection.error_count += 1
result = OperationResult(
success=False,
operation_type=OperationType.GET,
table_name=self.table_name,
row_key=row_key,
error_message=str(e),
execution_time=round((time.time() - start_time) * 1000, 2)
)
self.operation_history.append(result)
return result
def delete(self, row_key: str, column_family: Optional[str] = None,
qualifier: Optional[str] = None) -> OperationResult:
"""删除数据"""
start_time = time.time()
try:
if not self.connection.is_connected():
raise Exception("连接未建立")
deleted_data = {}
if row_key in self._data_store:
if column_family and qualifier:
# 删除特定列
column_name = f"{column_family}:{qualifier}"
if column_name in self._data_store[row_key]:
deleted_data[column_name] = self._data_store[row_key].pop(column_name)
elif column_family:
# 删除列族下所有列
keys_to_delete = [k for k in self._data_store[row_key].keys() if k.startswith(f"{column_family}:")]
for key in keys_to_delete:
deleted_data[key] = self._data_store[row_key].pop(key)
else:
# 删除整行
deleted_data = self._data_store.pop(row_key, {})
# 如果行为空,删除行
if row_key in self._data_store and not self._data_store[row_key]:
del self._data_store[row_key]
self.connection._update_activity()
result = OperationResult(
success=True,
operation_type=OperationType.DELETE,
table_name=self.table_name,
row_key=row_key,
data=deleted_data,
execution_time=round((time.time() - start_time) * 1000, 2)
)
self.operation_history.append(result)
return result
except Exception as e:
self.connection.error_count += 1
result = OperationResult(
success=False,
operation_type=OperationType.DELETE,
table_name=self.table_name,
row_key=row_key,
error_message=str(e),
execution_time=round((time.time() - start_time) * 1000, 2)
)
self.operation_history.append(result)
return result
def scan(self, start_row: Optional[str] = None, end_row: Optional[str] = None,
limit: int = 100) -> OperationResult:
"""扫描数据"""
start_time = time.time()
try:
if not self.connection.is_connected():
raise Exception("连接未建立")
# 获取所有行键并排序
all_row_keys = sorted(self._data_store.keys())
# 应用范围过滤
if start_row:
all_row_keys = [rk for rk in all_row_keys if rk >= start_row]
if end_row:
all_row_keys = [rk for rk in all_row_keys if rk < end_row]
# 限制结果数量
all_row_keys = all_row_keys[:limit]
# 构造结果
scan_results = []
for row_key in all_row_keys:
scan_results.append({
"row_key": row_key,
"data": self._data_store[row_key]
})
self.connection._update_activity()
result = OperationResult(
success=True,
operation_type=OperationType.SCAN,
table_name=self.table_name,
data={"results": scan_results, "count": len(scan_results)},
execution_time=round((time.time() - start_time) * 1000, 2)
)
self.operation_history.append(result)
return result
except Exception as e:
self.connection.error_count += 1
result = OperationResult(
success=False,
operation_type=OperationType.SCAN,
table_name=self.table_name,
error_message=str(e),
execution_time=round((time.time() - start_time) * 1000, 2)
)
self.operation_history.append(result)
return result
def get_operation_statistics(self) -> Dict[str, Any]:
"""获取操作统计"""
if not self.operation_history:
return {"total_operations": 0}
# 按操作类型统计
type_stats = {}
total_time = 0
success_count = 0
for op in self.operation_history:
op_type = op.operation_type.value
if op_type not in type_stats:
type_stats[op_type] = {"count": 0, "success": 0, "total_time": 0}
type_stats[op_type]["count"] += 1
type_stats[op_type]["total_time"] += op.execution_time
if op.success:
type_stats[op_type]["success"] += 1
success_count += 1
total_time += op.execution_time
# 计算平均时间
for stats in type_stats.values():
stats["avg_time"] = round(stats["total_time"] / stats["count"], 2)
stats["success_rate"] = round(stats["success"] / stats["count"] * 100, 2)
return {
"total_operations": len(self.operation_history),
"success_count": success_count,
"success_rate": round(success_count / len(self.operation_history) * 100, 2),
"total_execution_time": round(total_time, 2),
"avg_execution_time": round(total_time / len(self.operation_history), 2),
"operations_by_type": type_stats
}
class HBaseClient:
"""HBase客户端类"""
def __init__(self, config: ConnectionConfig):
self.config = config
self.connection = HBaseConnection(config)
self.tables: Dict[str, HBaseTable] = {}
def connect(self) -> bool:
"""连接到HBase"""
return self.connection.connect()
def disconnect(self) -> bool:
"""断开连接"""
return self.connection.disconnect()
def get_table(self, table_name: str) -> HBaseTable:
"""获取表对象"""
if table_name not in self.tables:
self.tables[table_name] = HBaseTable(self.connection, table_name)
return self.tables[table_name]
def get_client_statistics(self) -> Dict[str, Any]:
"""获取客户端统计信息"""
connection_info = self.connection.get_connection_info()
table_stats = {}
for table_name, table in self.tables.items():
table_stats[table_name] = table.get_operation_statistics()
return {
"connection": connection_info,
"tables": table_stats,
"total_tables_accessed": len(self.tables)
}
# 使用示例
print("=== HBase Java API 模拟器 ===")
# 创建连接配置
config = ConnectionConfig(
zookeeper_quorum="localhost:2181",
client_timeout=30000,
rpc_timeout=60000,
retries=3
)
# 创建客户端并连接
client = HBaseClient(config)
connection_success = client.connect()
print(f"连接结果: {connection_success}")
if connection_success:
# 获取表对象
users_table = client.get_table("users")
# 执行操作
print("\n=== 执行数据操作 ===")
# 插入数据
put_result = users_table.put("user001", "info", "name", "张三")
print(f"PUT操作: {put_result.success}, 耗时: {put_result.execution_time}ms")
put_result = users_table.put("user001", "info", "age", "25")
print(f"PUT操作: {put_result.success}, 耗时: {put_result.execution_time}ms")
put_result = users_table.put("user001", "stats", "login_count", "100")
print(f"PUT操作: {put_result.success}, 耗时: {put_result.execution_time}ms")
# 获取数据
get_result = users_table.get("user001")
print(f"\nGET操作: {get_result.success}, 耗时: {get_result.execution_time}ms")
if get_result.success:
print(f"获取到数据: {len(get_result.data)} 列")
for column, cell_data in get_result.data.items():
print(f" {column}: {cell_data['value']}")
# 获取特定列
get_result = users_table.get("user001", "info", "name")
print(f"\nGET特定列: {get_result.success}, 数据: {get_result.data}")
# 扫描表
scan_result = users_table.scan()
print(f"\nSCAN操作: {scan_result.success}, 耗时: {scan_result.execution_time}ms")
if scan_result.success:
print(f"扫描到 {scan_result.data['count']} 行数据")
# 删除数据
delete_result = users_table.delete("user001", "info", "age")
print(f"\nDELETE操作: {delete_result.success}, 耗时: {delete_result.execution_time}ms")
# 再次获取数据验证删除
get_result = users_table.get("user001")
print(f"删除后GET操作: {get_result.success}, 剩余列数: {len(get_result.data)}")
# 获取统计信息
print("\n=== 操作统计 ===")
table_stats = users_table.get_operation_statistics()
print(f"表操作统计: {json.dumps(table_stats, indent=2, ensure_ascii=False)}")
client_stats = client.get_client_statistics()
print(f"\n客户端统计: {json.dumps(client_stats, indent=2, ensure_ascii=False)}")
# 断开连接
client.disconnect()
2.2 批量操作
from typing import List, Tuple
class BatchOperation:
"""批量操作类"""
def __init__(self, table: HBaseTable):
self.table = table
self.operations: List[Dict[str, Any]] = []
def add_put(self, row_key: str, column_family: str, qualifier: str, value: Any) -> 'BatchOperation':
"""添加PUT操作"""
self.operations.append({
"type": "put",
"row_key": row_key,
"column_family": column_family,
"qualifier": qualifier,
"value": value
})
return self
def add_delete(self, row_key: str, column_family: Optional[str] = None,
qualifier: Optional[str] = None) -> 'BatchOperation':
"""添加DELETE操作"""
self.operations.append({
"type": "delete",
"row_key": row_key,
"column_family": column_family,
"qualifier": qualifier
})
return self
def execute(self) -> List[OperationResult]:
"""执行批量操作"""
results = []
start_time = time.time()
try:
if not self.table.connection.is_connected():
raise Exception("连接未建立")
for op in self.operations:
if op["type"] == "put":
result = self.table.put(
op["row_key"],
op["column_family"],
op["qualifier"],
op["value"]
)
elif op["type"] == "delete":
result = self.table.delete(
op["row_key"],
op["column_family"],
op["qualifier"]
)
results.append(result)
# 创建批量操作结果
batch_result = OperationResult(
success=all(r.success for r in results),
operation_type=OperationType.BATCH,
table_name=self.table.table_name,
data={
"operations_count": len(self.operations),
"success_count": sum(1 for r in results if r.success),
"results": [{
"operation": i,
"success": r.success,
"error": r.error_message
} for i, r in enumerate(results)]
},
execution_time=round((time.time() - start_time) * 1000, 2)
)
self.table.operation_history.append(batch_result)
except Exception as e:
batch_result = OperationResult(
success=False,
operation_type=OperationType.BATCH,
table_name=self.table.table_name,
error_message=str(e),
execution_time=round((time.time() - start_time) * 1000, 2)
)
self.table.operation_history.append(batch_result)
return results
def clear(self) -> 'BatchOperation':
"""清空操作列表"""
self.operations.clear()
return self
# 批量操作示例
print("\n=== 批量操作示例 ===")
# 创建批量操作
batch = BatchOperation(users_table)
# 添加多个操作
batch.add_put("user002", "info", "name", "李四") \
.add_put("user002", "info", "age", "30") \
.add_put("user002", "stats", "login_count", "50") \
.add_put("user003", "info", "name", "王五") \
.add_put("user003", "info", "age", "28")
# 执行批量操作
results = batch.execute()
print(f"批量操作完成,共 {len(results)} 个操作")
for i, result in enumerate(results):
print(f" 操作 {i+1}: {'成功' if result.success else '失败'}")
# 验证批量插入结果
scan_result = users_table.scan()
print(f"\n表中共有 {scan_result.data['count']} 行数据")
2.3 过滤器操作
from abc import ABC, abstractmethod
import re
class FilterType(Enum):
"""过滤器类型枚举"""
ROW_KEY = "row_key"
COLUMN_FAMILY = "column_family"
COLUMN_QUALIFIER = "column_qualifier"
VALUE = "value"
TIMESTAMP = "timestamp"
class CompareOperator(Enum):
"""比较操作符枚举"""
EQUAL = "equal"
NOT_EQUAL = "not_equal"
GREATER = "greater"
GREATER_OR_EQUAL = "greater_or_equal"
LESS = "less"
LESS_OR_EQUAL = "less_or_equal"
REGEX = "regex"
SUBSTRING = "substring"
class HBaseFilter(ABC):
"""HBase过滤器基类"""
@abstractmethod
def filter_row(self, row_key: str, row_data: Dict[str, Any]) -> bool:
"""过滤行数据"""
pass
@abstractmethod
def get_filter_info(self) -> Dict[str, Any]:
"""获取过滤器信息"""
pass
class RowKeyFilter(HBaseFilter):
"""行键过滤器"""
def __init__(self, operator: CompareOperator, value: str):
self.operator = operator
self.value = value
def filter_row(self, row_key: str, row_data: Dict[str, Any]) -> bool:
"""根据行键过滤"""
if self.operator == CompareOperator.EQUAL:
return row_key == self.value
elif self.operator == CompareOperator.NOT_EQUAL:
return row_key != self.value
elif self.operator == CompareOperator.GREATER:
return row_key > self.value
elif self.operator == CompareOperator.GREATER_OR_EQUAL:
return row_key >= self.value
elif self.operator == CompareOperator.LESS:
return row_key < self.value
elif self.operator == CompareOperator.LESS_OR_EQUAL:
return row_key <= self.value
elif self.operator == CompareOperator.REGEX:
return bool(re.match(self.value, row_key))
elif self.operator == CompareOperator.SUBSTRING:
return self.value in row_key
return False
def get_filter_info(self) -> Dict[str, Any]:
return {
"type": "RowKeyFilter",
"operator": self.operator.value,
"value": self.value
}
class ValueFilter(HBaseFilter):
"""值过滤器"""
def __init__(self, column_family: str, qualifier: str,
operator: CompareOperator, value: Any):
self.column_family = column_family
self.qualifier = qualifier
self.operator = operator
self.value = value
self.column_name = f"{column_family}:{qualifier}"
def filter_row(self, row_key: str, row_data: Dict[str, Any]) -> bool:
"""根据列值过滤"""
if self.column_name not in row_data:
return False
cell_value = row_data[self.column_name]["value"]
if self.operator == CompareOperator.EQUAL:
return str(cell_value) == str(self.value)
elif self.operator == CompareOperator.NOT_EQUAL:
return str(cell_value) != str(self.value)
elif self.operator == CompareOperator.GREATER:
try:
return float(cell_value) > float(self.value)
except ValueError:
return str(cell_value) > str(self.value)
elif self.operator == CompareOperator.GREATER_OR_EQUAL:
try:
return float(cell_value) >= float(self.value)
except ValueError:
return str(cell_value) >= str(self.value)
elif self.operator == CompareOperator.LESS:
try:
return float(cell_value) < float(self.value)
except ValueError:
return str(cell_value) < str(self.value)
elif self.operator == CompareOperator.LESS_OR_EQUAL:
try:
return float(cell_value) <= float(self.value)
except ValueError:
return str(cell_value) <= str(self.value)
elif self.operator == CompareOperator.REGEX:
return bool(re.match(str(self.value), str(cell_value)))
elif self.operator == CompareOperator.SUBSTRING:
return str(self.value) in str(cell_value)
return False
def get_filter_info(self) -> Dict[str, Any]:
return {
"type": "ValueFilter",
"column": self.column_name,
"operator": self.operator.value,
"value": self.value
}
class FilterList(HBaseFilter):
"""过滤器列表"""
def __init__(self, logical_operator: str = "AND"):
self.logical_operator = logical_operator.upper() # AND 或 OR
self.filters: List[HBaseFilter] = []
def add_filter(self, filter_obj: HBaseFilter) -> 'FilterList':
"""添加过滤器"""
self.filters.append(filter_obj)
return self
def filter_row(self, row_key: str, row_data: Dict[str, Any]) -> bool:
"""应用过滤器列表"""
if not self.filters:
return True
results = [f.filter_row(row_key, row_data) for f in self.filters]
if self.logical_operator == "AND":
return all(results)
elif self.logical_operator == "OR":
return any(results)
return True
def get_filter_info(self) -> Dict[str, Any]:
return {
"type": "FilterList",
"logical_operator": self.logical_operator,
"filters": [f.get_filter_info() for f in self.filters]
}
class FilteredScan:
"""带过滤器的扫描"""
def __init__(self, table: HBaseTable):
self.table = table
self.filters: List[HBaseFilter] = []
self.start_row: Optional[str] = None
self.end_row: Optional[str] = None
self.limit: int = 100
def set_filter(self, filter_obj: HBaseFilter) -> 'FilteredScan':
"""设置过滤器"""
self.filters = [filter_obj]
return self
def add_filter(self, filter_obj: HBaseFilter) -> 'FilteredScan':
"""添加过滤器"""
self.filters.append(filter_obj)
return self
def set_start_row(self, start_row: str) -> 'FilteredScan':
"""设置起始行"""
self.start_row = start_row
return self
def set_end_row(self, end_row: str) -> 'FilteredScan':
"""设置结束行"""
self.end_row = end_row
return self
def set_limit(self, limit: int) -> 'FilteredScan':
"""设置限制数量"""
self.limit = limit
return self
def execute(self) -> OperationResult:
"""执行过滤扫描"""
start_time = time.time()
try:
if not self.table.connection.is_connected():
raise Exception("连接未建立")
# 获取所有行键并排序
all_row_keys = sorted(self.table._data_store.keys())
# 应用范围过滤
if self.start_row:
all_row_keys = [rk for rk in all_row_keys if rk >= self.start_row]
if self.end_row:
all_row_keys = [rk for rk in all_row_keys if rk < self.end_row]
# 应用过滤器
filtered_results = []
for row_key in all_row_keys:
row_data = self.table._data_store[row_key]
# 检查所有过滤器
passes_all_filters = True
for filter_obj in self.filters:
if not filter_obj.filter_row(row_key, row_data):
passes_all_filters = False
break
if passes_all_filters:
filtered_results.append({
"row_key": row_key,
"data": row_data
})
# 检查限制
if len(filtered_results) >= self.limit:
break
self.table.connection._update_activity()
result = OperationResult(
success=True,
operation_type=OperationType.SCAN,
table_name=self.table.table_name,
data={
"results": filtered_results,
"count": len(filtered_results),
"filters_applied": [f.get_filter_info() for f in self.filters]
},
execution_time=round((time.time() - start_time) * 1000, 2)
)
self.table.operation_history.append(result)
return result
except Exception as e:
self.table.connection.error_count += 1
result = OperationResult(
success=False,
operation_type=OperationType.SCAN,
table_name=self.table.table_name,
error_message=str(e),
execution_time=round((time.time() - start_time) * 1000, 2)
)
self.table.operation_history.append(result)
return result
# 过滤器使用示例
print("\n=== 过滤器操作示例 ===")
# 1. 行键过滤器
row_filter = RowKeyFilter(CompareOperator.REGEX, r"user00[1-2]")
filtered_scan = FilteredScan(users_table).set_filter(row_filter)
result = filtered_scan.execute()
print(f"行键过滤结果: {result.data['count']} 行")
for row in result.data['results']:
print(f" {row['row_key']}")
# 2. 值过滤器
value_filter = ValueFilter("info", "age", CompareOperator.GREATER, "25")
filtered_scan = FilteredScan(users_table).set_filter(value_filter)
result = filtered_scan.execute()
print(f"\n年龄大于25的用户: {result.data['count']} 行")
for row in result.data['results']:
age = row['data'].get('info:age', {}).get('value', 'N/A')
name = row['data'].get('info:name', {}).get('value', 'N/A')
print(f" {row['row_key']}: {name}, 年龄: {age}")
3. 性能优化与最佳实践
3.1 连接池管理
from threading import Lock
from queue import Queue, Empty
import threading
class ConnectionPool:
"""HBase连接池"""
def __init__(self, max_connections: int = 10,
connection_config: Optional[ConnectionConfig] = None):
self.max_connections = max_connections
self.connection_config = connection_config or ConnectionConfig()
self._pool: Queue[HBaseConnection] = Queue(maxsize=max_connections)
self._lock = Lock()
self._created_connections = 0
self._active_connections = 0
# 预创建一些连接
self._initialize_pool()
def _initialize_pool(self):
"""初始化连接池"""
initial_size = min(3, self.max_connections)
for _ in range(initial_size):
conn = self._create_connection()
if conn:
self._pool.put(conn)
def _create_connection(self) -> Optional[HBaseConnection]:
"""创建新连接"""
try:
conn = HBaseConnection(self.connection_config)
conn.connect()
self._created_connections += 1
return conn
except Exception as e:
print(f"创建连接失败: {e}")
return None
def get_connection(self, timeout: float = 5.0) -> Optional[HBaseConnection]:
"""获取连接"""
with self._lock:
try:
# 尝试从池中获取连接
conn = self._pool.get(timeout=timeout)
# 检查连接是否有效
if conn.is_connected():
self._active_connections += 1
return conn
else:
# 连接无效,重新创建
conn = self._create_connection()
if conn:
self._active_connections += 1
return conn
except Empty:
# 池中没有可用连接,尝试创建新连接
if self._created_connections < self.max_connections:
conn = self._create_connection()
if conn:
self._active_connections += 1
return conn
return None
def return_connection(self, conn: HBaseConnection):
"""归还连接"""
with self._lock:
if conn and conn.is_connected():
try:
self._pool.put(conn, timeout=1.0)
self._active_connections -= 1
except:
# 池已满,关闭连接
conn.disconnect()
self._created_connections -= 1
else:
# 连接无效,减少计数
if self._active_connections > 0:
self._active_connections -= 1
if self._created_connections > 0:
self._created_connections -= 1
def close_all(self):
"""关闭所有连接"""
with self._lock:
while not self._pool.empty():
try:
conn = self._pool.get_nowait()
conn.disconnect()
except Empty:
break
self._created_connections = 0
self._active_connections = 0
def get_pool_stats(self) -> Dict[str, Any]:
"""获取连接池统计信息"""
with self._lock:
return {
"max_connections": self.max_connections,
"created_connections": self._created_connections,
"active_connections": self._active_connections,
"available_connections": self._pool.qsize(),
"pool_utilization": round(self._active_connections / self.max_connections * 100, 2)
}
class PooledHBaseClient:
"""使用连接池的HBase客户端"""
def __init__(self, connection_pool: ConnectionPool):
self.pool = connection_pool
def execute_operation(self, operation_func, *args, **kwargs):
"""执行操作"""
conn = self.pool.get_connection()
if not conn:
raise Exception("无法获取数据库连接")
try:
# 执行操作
return operation_func(conn, *args, **kwargs)
finally:
# 归还连接
self.pool.return_connection(conn)
def get_table(self, table_name: str) -> 'PooledHBaseTable':
"""获取表对象"""
return PooledHBaseTable(table_name, self.pool)
class PooledHBaseTable:
"""使用连接池的HBase表"""
def __init__(self, table_name: str, connection_pool: ConnectionPool):
self.table_name = table_name
self.pool = connection_pool
def put(self, row_key: str, column_family: str, qualifier: str, value: Any) -> OperationResult:
"""PUT操作"""
def _put_operation(conn: HBaseConnection, rk, cf, q, v):
table = HBaseTable(self.table_name, conn)
return table.put(rk, cf, q, v)
return self.pool.get_connection() and self._execute_with_connection(_put_operation, row_key, column_family, qualifier, value)
def get(self, row_key: str, column_family: Optional[str] = None,
qualifier: Optional[str] = None) -> OperationResult:
"""GET操作"""
def _get_operation(conn: HBaseConnection, rk, cf, q):
table = HBaseTable(self.table_name, conn)
return table.get(rk, cf, q)
return self._execute_with_connection(_get_operation, row_key, column_family, qualifier)
def _execute_with_connection(self, operation_func, *args, **kwargs):
"""使用连接执行操作"""
conn = self.pool.get_connection()
if not conn:
raise Exception("无法获取数据库连接")
try:
return operation_func(conn, *args, **kwargs)
finally:
self.pool.return_connection(conn)
# 连接池使用示例
print("\n=== 连接池使用示例 ===")
# 创建连接池
pool = ConnectionPool(max_connections=5)
print(f"连接池创建完成: {pool.get_pool_stats()}")
# 创建池化客户端
pooled_client = PooledHBaseClient(pool)
pooled_table = pooled_client.get_table("users")
# 执行操作
result = pooled_table.put("user004", "info", "name", "赵六")
print(f"PUT操作结果: {'成功' if result.success else '失败'}")
result = pooled_table.get("user004")
print(f"GET操作结果: {'成功' if result.success else '失败'}")
# 查看连接池状态
print(f"连接池状态: {pool.get_pool_stats()}")
# 关闭连接池
pool.close_all()
3.2 批量操作优化
class OptimizedBatchOperation:
"""优化的批量操作"""
def __init__(self, table: HBaseTable, batch_size: int = 100):
self.table = table
self.batch_size = batch_size
self.operations: List[Dict[str, Any]] = []
self.auto_flush = True
self.total_operations = 0
self.successful_operations = 0
def set_auto_flush(self, auto_flush: bool) -> 'OptimizedBatchOperation':
"""设置自动刷新"""
self.auto_flush = auto_flush
return self
def add_put(self, row_key: str, column_family: str, qualifier: str, value: Any) -> 'OptimizedBatchOperation':
"""添加PUT操作"""
self.operations.append({
"type": "put",
"row_key": row_key,
"column_family": column_family,
"qualifier": qualifier,
"value": value
})
if self.auto_flush and len(self.operations) >= self.batch_size:
self.flush()
return self
def add_delete(self, row_key: str, column_family: Optional[str] = None,
qualifier: Optional[str] = None) -> 'OptimizedBatchOperation':
"""添加DELETE操作"""
self.operations.append({
"type": "delete",
"row_key": row_key,
"column_family": column_family,
"qualifier": qualifier
})
if self.auto_flush and len(self.operations) >= self.batch_size:
self.flush()
return self
def flush(self) -> List[OperationResult]:
"""刷新操作"""
if not self.operations:
return []
results = []
start_time = time.time()
try:
# 按行键排序以提高性能
sorted_operations = sorted(self.operations, key=lambda x: x["row_key"])
for op in sorted_operations:
if op["type"] == "put":
result = self.table.put(
op["row_key"],
op["column_family"],
op["qualifier"],
op["value"]
)
elif op["type"] == "delete":
result = self.table.delete(
op["row_key"],
op["column_family"],
op["qualifier"]
)
results.append(result)
if result.success:
self.successful_operations += 1
self.total_operations += len(self.operations)
except Exception as e:
print(f"批量操作执行失败: {e}")
# 清空操作列表
self.operations.clear()
return results
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
"total_operations": self.total_operations,
"successful_operations": self.successful_operations,
"success_rate": round(self.successful_operations / max(self.total_operations, 1) * 100, 2),
"pending_operations": len(self.operations)
}
# 优化批量操作示例
print("\n=== 优化批量操作示例 ===")
optimized_batch = OptimizedBatchOperation(users_table, batch_size=3)
optimized_batch.set_auto_flush(True)
# 添加操作(会自动刷新)
for i in range(10):
optimized_batch.add_put(f"batch_user_{i:03d}", "info", "name", f"批量用户{i}")
optimized_batch.add_put(f"batch_user_{i:03d}", "info", "age", str(20 + i))
# 手动刷新剩余操作
optimized_batch.flush()
# 查看统计信息
stats = optimized_batch.get_statistics()
print(f"批量操作统计: {stats}")
3.3 错误处理与重试机制
import random
from functools import wraps
class RetryConfig:
"""重试配置"""
def __init__(self, max_retries: int = 3, base_delay: float = 1.0,
max_delay: float = 60.0, backoff_factor: float = 2.0):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.backoff_factor = backoff_factor
def retry_on_failure(retry_config: RetryConfig):
"""重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(retry_config.max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < retry_config.max_retries:
# 计算延迟时间(指数退避 + 随机抖动)
delay = min(
retry_config.base_delay * (retry_config.backoff_factor ** attempt),
retry_config.max_delay
)
jitter = random.uniform(0, delay * 0.1) # 10%的随机抖动
total_delay = delay + jitter
print(f"操作失败,{total_delay:.2f}秒后重试 (尝试 {attempt + 1}/{retry_config.max_retries})")
time.sleep(total_delay)
else:
print(f"操作最终失败,已重试 {retry_config.max_retries} 次")
raise last_exception
return wrapper
return decorator
class RobustHBaseClient:
"""具有错误处理和重试机制的HBase客户端"""
def __init__(self, connection_config: ConnectionConfig, retry_config: RetryConfig):
self.connection_config = connection_config
self.retry_config = retry_config
self.connection: Optional[HBaseConnection] = None
self.error_count = 0
self.last_error_time: Optional[float] = None
@retry_on_failure(RetryConfig(max_retries=3))
def connect(self):
"""连接到HBase"""
if self.connection and self.connection.is_connected():
return
self.connection = HBaseConnection(self.connection_config)
self.connection.connect()
if not self.connection.is_connected():
raise Exception("连接失败")
def _ensure_connected(self):
"""确保连接可用"""
if not self.connection or not self.connection.is_connected():
self.connect()
@retry_on_failure(RetryConfig(max_retries=2))
def get_table(self, table_name: str) -> 'RobustHBaseTable':
"""获取表对象"""
self._ensure_connected()
return RobustHBaseTable(table_name, self.connection, self.retry_config)
def get_error_statistics(self) -> Dict[str, Any]:
"""获取错误统计"""
return {
"error_count": self.error_count,
"last_error_time": self.last_error_time,
"connection_status": self.connection.get_connection_info() if self.connection else None
}
class RobustHBaseTable:
"""具有错误处理的HBase表"""
def __init__(self, table_name: str, connection: HBaseConnection, retry_config: RetryConfig):
self.table_name = table_name
self.connection = connection
self.retry_config = retry_config
self._table = HBaseTable(table_name, connection)
@retry_on_failure(RetryConfig(max_retries=2))
def put(self, row_key: str, column_family: str, qualifier: str, value: Any) -> OperationResult:
"""PUT操作"""
return self._table.put(row_key, column_family, qualifier, value)
@retry_on_failure(RetryConfig(max_retries=2))
def get(self, row_key: str, column_family: Optional[str] = None,
qualifier: Optional[str] = None) -> OperationResult:
"""GET操作"""
return self._table.get(row_key, column_family, qualifier)
@retry_on_failure(RetryConfig(max_retries=1))
def scan(self, start_row: Optional[str] = None, end_row: Optional[str] = None,
limit: int = 100) -> OperationResult:
"""SCAN操作"""
return self._table.scan(start_row, end_row, limit)
# 错误处理示例
print("\n=== 错误处理与重试示例 ===")
retry_config = RetryConfig(max_retries=2, base_delay=0.5)
robust_client = RobustHBaseClient(ConnectionConfig(), retry_config)
try:
robust_client.connect()
robust_table = robust_client.get_table("users")
# 执行操作
result = robust_table.put("robust_user", "info", "name", "健壮用户")
print(f"PUT操作: {'成功' if result.success else '失败'}")
result = robust_table.get("robust_user")
print(f"GET操作: {'成功' if result.success else '失败'}")
except Exception as e:
print(f"操作失败: {e}")
print(f"错误统计: {robust_client.get_error_statistics()}")
4. 总结
4.1 关键要点
HBase Shell基础
- 掌握基本的DDL和DML命令
- 理解命令语法和参数
- 熟练使用过滤器和扫描
Java API操作
- 连接管理和配置
- 表操作和数据操作
- 批量操作和过滤器
性能优化
- 使用连接池管理连接
- 批量操作提高吞吐量
- 合理设置批次大小
错误处理
- 实现重试机制
- 监控连接状态
- 记录和分析错误
4.2 最佳实践
连接管理
- 使用连接池避免频繁创建连接
- 及时释放不用的连接
- 监控连接池状态
批量操作
- 合理设置批次大小(通常100-1000)
- 按行键排序提高性能
- 使用自动刷新机制
错误处理
- 实现指数退避重试
- 添加随机抖动避免雷群效应
- 记录详细的错误信息
性能监控
- 监控操作延迟
- 跟踪成功率
- 分析性能瓶颈
4.3 下一步学习
- HBase高级操作与客户端API:深入学习协处理器、计数器等高级功能
- HBase集群管理:了解集群部署、监控和运维
- HBase性能调优:学习存储优化、查询优化等技术
- HBase与其他系统集成:探索与Spark、MapReduce等的集成方案
3. 组合过滤器
filter_list = FilterList(“AND”)
filter_list.add_filter(RowKeyFilter(CompareOperator.REGEX, r”user.*“))
.add_filter(ValueFilter(“info”, “age”, CompareOperator.LESS_OR_EQUAL, “30”))
filtered_scan = FilteredScan(users_table).set_filter(filter_list) result = filtered_scan.execute() print(f”\n组合过滤结果: {result.data[‘count’]} 行”) for row in result.data[‘results’]: age = row[‘data’].get(‘info:age’, {}).get(‘value’, ‘N/A’) name = row[‘data’].get(‘info:name’, {}).get(‘value’, ‘N/A’) print(f” {row[‘row_key’]}: {name}, 年龄: {age}“)”