1. HBase Shell操作

1.1 Shell基础命令

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Union
from datetime import datetime
import json
import re
import time

class ShellCommandType(Enum):
    """Shell命令类型枚举"""
    DDL = "ddl"  # 数据定义语言
    DML = "dml"  # 数据操作语言
    ADMIN = "admin"  # 管理命令
    UTILITY = "utility"  # 工具命令

class CommandStatus(Enum):
    """命令执行状态枚举"""
    SUCCESS = "success"
    FAILED = "failed"
    PARTIAL = "partial"
    TIMEOUT = "timeout"

@dataclass
class ShellCommand:
    """Shell命令数据类"""
    command: str
    command_type: ShellCommandType
    description: str
    syntax: str
    examples: List[str]
    parameters: Dict[str, str]
    
class HBaseShellSimulator:
    """HBase Shell模拟器类"""
    
    def __init__(self):
        self.tables: Dict[str, Dict[str, Any]] = {}
        self.command_history: List[Dict[str, Any]] = []
        self.current_namespace = "default"
        self.shell_commands = self._initialize_commands()
    
    def _initialize_commands(self) -> Dict[str, ShellCommand]:
        """初始化Shell命令"""
        commands = {}
        
        # DDL命令
        commands["create"] = ShellCommand(
            command="create",
            command_type=ShellCommandType.DDL,
            description="创建表",
            syntax="create 'table_name', 'column_family1', 'column_family2', ...",
            examples=[
                "create 'users', 'info', 'stats'",
                "create 'logs', {NAME => 'data', VERSIONS => 3}"
            ],
            parameters={
                "table_name": "表名",
                "column_family": "列族名称",
                "VERSIONS": "最大版本数",
                "TTL": "生存时间(秒)",
                "COMPRESSION": "压缩算法"
            }
        )
        
        commands["drop"] = ShellCommand(
            command="drop",
            command_type=ShellCommandType.DDL,
            description="删除表",
            syntax="drop 'table_name'",
            examples=["drop 'users'"],
            parameters={"table_name": "要删除的表名"}
        )
        
        commands["describe"] = ShellCommand(
            command="describe",
            command_type=ShellCommandType.DDL,
            description="描述表结构",
            syntax="describe 'table_name'",
            examples=["describe 'users'"],
            parameters={"table_name": "要描述的表名"}
        )
        
        # DML命令
        commands["put"] = ShellCommand(
            command="put",
            command_type=ShellCommandType.DML,
            description="插入或更新数据",
            syntax="put 'table_name', 'row_key', 'column_family:qualifier', 'value'",
            examples=[
                "put 'users', 'user001', 'info:name', 'Zhang San'",
                "put 'users', 'user001', 'info:age', '25'"
            ],
            parameters={
                "table_name": "表名",
                "row_key": "行键",
                "column": "列名(列族:限定符)",
                "value": "值",
                "timestamp": "时间戳(可选)"
            }
        )
        
        commands["get"] = ShellCommand(
            command="get",
            command_type=ShellCommandType.DML,
            description="获取数据",
            syntax="get 'table_name', 'row_key'",
            examples=[
                "get 'users', 'user001'",
                "get 'users', 'user001', 'info:name'",
                "get 'users', 'user001', {COLUMN => 'info:name', VERSIONS => 3}"
            ],
            parameters={
                "table_name": "表名",
                "row_key": "行键",
                "COLUMN": "指定列",
                "VERSIONS": "版本数",
                "TIMERANGE": "时间范围"
            }
        )
        
        commands["scan"] = ShellCommand(
            command="scan",
            command_type=ShellCommandType.DML,
            description="扫描表数据",
            syntax="scan 'table_name'",
            examples=[
                "scan 'users'",
                "scan 'users', {STARTROW => 'user001', ENDROW => 'user999'}",
                "scan 'users', {COLUMNS => 'info:name', LIMIT => 10}"
            ],
            parameters={
                "table_name": "表名",
                "STARTROW": "起始行键",
                "ENDROW": "结束行键",
                "COLUMNS": "指定列",
                "LIMIT": "限制行数",
                "FILTER": "过滤器"
            }
        )
        
        commands["delete"] = ShellCommand(
            command="delete",
            command_type=ShellCommandType.DML,
            description="删除数据",
            syntax="delete 'table_name', 'row_key', 'column_family:qualifier'",
            examples=[
                "delete 'users', 'user001', 'info:age'",
                "delete 'users', 'user001', 'info:age', timestamp"
            ],
            parameters={
                "table_name": "表名",
                "row_key": "行键",
                "column": "列名",
                "timestamp": "时间戳(可选)"
            }
        )
        
        # 管理命令
        commands["list"] = ShellCommand(
            command="list",
            command_type=ShellCommandType.ADMIN,
            description="列出所有表",
            syntax="list",
            examples=["list", "list 'user.*'"],
            parameters={"pattern": "表名模式(可选)"}
        )
        
        commands["status"] = ShellCommand(
            command="status",
            command_type=ShellCommandType.ADMIN,
            description="显示集群状态",
            syntax="status",
            examples=["status", "status 'simple'", "status 'detailed'"],
            parameters={"format": "显示格式(simple/detailed)"}
        )
        
        commands["version"] = ShellCommand(
            command="version",
            command_type=ShellCommandType.ADMIN,
            description="显示HBase版本",
            syntax="version",
            examples=["version"],
            parameters={}
        )
        
        return commands
    
    def execute_command(self, command_line: str) -> Dict[str, Any]:
        """执行Shell命令"""
        start_time = time.time()
        
        try:
            # 解析命令
            parsed = self._parse_command(command_line)
            command_name = parsed["command"]
            args = parsed["args"]
            
            # 记录命令历史
            history_entry = {
                "timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                "command": command_line,
                "status": CommandStatus.SUCCESS.value
            }
            
            # 执行命令
            if command_name == "create":
                result = self._execute_create(args)
            elif command_name == "drop":
                result = self._execute_drop(args)
            elif command_name == "describe":
                result = self._execute_describe(args)
            elif command_name == "put":
                result = self._execute_put(args)
            elif command_name == "get":
                result = self._execute_get(args)
            elif command_name == "scan":
                result = self._execute_scan(args)
            elif command_name == "delete":
                result = self._execute_delete(args)
            elif command_name == "list":
                result = self._execute_list(args)
            elif command_name == "status":
                result = self._execute_status(args)
            elif command_name == "version":
                result = self._execute_version(args)
            elif command_name == "help":
                result = self._execute_help(args)
            else:
                result = {
                    "success": False,
                    "message": f"未知命令: {command_name}",
                    "suggestion": "使用 'help' 查看可用命令"
                }
                history_entry["status"] = CommandStatus.FAILED.value
            
            # 计算执行时间
            execution_time = time.time() - start_time
            result["execution_time"] = round(execution_time * 1000, 2)  # 毫秒
            
            history_entry["result"] = result
            self.command_history.append(history_entry)
            
            return result
            
        except Exception as e:
            error_result = {
                "success": False,
                "message": f"命令执行错误: {str(e)}",
                "execution_time": round((time.time() - start_time) * 1000, 2)
            }
            
            history_entry["status"] = CommandStatus.FAILED.value
            history_entry["result"] = error_result
            self.command_history.append(history_entry)
            
            return error_result
    
    def _parse_command(self, command_line: str) -> Dict[str, Any]:
        """解析命令行"""
        # 简化的命令解析
        parts = command_line.strip().split()
        if not parts:
            raise ValueError("空命令")
        
        command = parts[0].lower()
        args = parts[1:] if len(parts) > 1 else []
        
        return {"command": command, "args": args}
    
    def _execute_create(self, args: List[str]) -> Dict[str, Any]:
        """执行create命令"""
        if len(args) < 2:
            return {
                "success": False,
                "message": "create命令需要表名和至少一个列族",
                "syntax": self.shell_commands["create"].syntax
            }
        
        table_name = args[0].strip("'\"")
        column_families = [cf.strip("'\") for cf in args[1:]]
        
        # 创建表
        table_info = {
            "name": table_name,
            "column_families": {},
            "data": {},
            "created_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }
        
        for cf in column_families:
            table_info["column_families"][cf] = {
                "max_versions": 1,
                "ttl": None,
                "compression": "NONE"
            }
        
        self.tables[table_name] = table_info
        
        return {
            "success": True,
            "message": f"表 '{table_name}' 创建成功",
            "table_name": table_name,
            "column_families": column_families
        }
    
    def _execute_drop(self, args: List[str]) -> Dict[str, Any]:
        """执行drop命令"""
        if len(args) < 1:
            return {
                "success": False,
                "message": "drop命令需要表名",
                "syntax": self.shell_commands["drop"].syntax
            }
        
        table_name = args[0].strip("'\"")
        
        if table_name not in self.tables:
            return {
                "success": False,
                "message": f"表 '{table_name}' 不存在"
            }
        
        del self.tables[table_name]
        
        return {
            "success": True,
            "message": f"表 '{table_name}' 删除成功",
            "table_name": table_name
        }
    
    def _execute_describe(self, args: List[str]) -> Dict[str, Any]:
        """执行describe命令"""
        if len(args) < 1:
            return {
                "success": False,
                "message": "describe命令需要表名",
                "syntax": self.shell_commands["describe"].syntax
            }
        
        table_name = args[0].strip("'\"")
        
        if table_name not in self.tables:
            return {
                "success": False,
                "message": f"表 '{table_name}' 不存在"
            }
        
        table_info = self.tables[table_name]
        
        return {
            "success": True,
            "message": f"表 '{table_name}' 结构信息",
            "table_name": table_name,
            "column_families": table_info["column_families"],
            "created_time": table_info["created_time"]
        }
    
    def _execute_put(self, args: List[str]) -> Dict[str, Any]:
        """执行put命令"""
        if len(args) < 4:
            return {
                "success": False,
                "message": "put命令需要表名、行键、列名和值",
                "syntax": self.shell_commands["put"].syntax
            }
        
        table_name = args[0].strip("'\"")
        row_key = args[1].strip("'\"")
        column = args[2].strip("'\"")
        value = args[3].strip("'\"")
        
        if table_name not in self.tables:
            return {
                "success": False,
                "message": f"表 '{table_name}' 不存在"
            }
        
        # 解析列名
        if ':' not in column:
            return {
                "success": False,
                "message": "列名格式错误,应为 'column_family:qualifier'"
            }
        
        cf, qualifier = column.split(':', 1)
        
        # 检查列族是否存在
        if cf not in self.tables[table_name]["column_families"]:
            return {
                "success": False,
                "message": f"列族 '{cf}' 不存在"
            }
        
        # 插入数据
        if row_key not in self.tables[table_name]["data"]:
            self.tables[table_name]["data"][row_key] = {}
        
        self.tables[table_name]["data"][row_key][column] = {
            "value": value,
            "timestamp": int(datetime.now().timestamp() * 1000)
        }
        
        return {
            "success": True,
            "message": f"数据插入成功",
            "table_name": table_name,
            "row_key": row_key,
            "column": column,
            "value": value
        }
    
    def _execute_get(self, args: List[str]) -> Dict[str, Any]:
        """执行get命令"""
        if len(args) < 2:
            return {
                "success": False,
                "message": "get命令需要表名和行键",
                "syntax": self.shell_commands["get"].syntax
            }
        
        table_name = args[0].strip("'\"")
        row_key = args[1].strip("'\"")
        column_filter = args[2].strip("'\") if len(args) > 2 else None
        
        if table_name not in self.tables:
            return {
                "success": False,
                "message": f"表 '{table_name}' 不存在"
            }
        
        if row_key not in self.tables[table_name]["data"]:
            return {
                "success": True,
                "message": f"行 '{row_key}' 不存在",
                "data": {}
            }
        
        row_data = self.tables[table_name]["data"][row_key]
        
        if column_filter:
            # 过滤特定列
            filtered_data = {k: v for k, v in row_data.items() if k == column_filter}
        else:
            # 返回所有列
            filtered_data = row_data
        
        return {
            "success": True,
            "message": f"获取数据成功",
            "table_name": table_name,
            "row_key": row_key,
            "data": filtered_data
        }
    
    def _execute_scan(self, args: List[str]) -> Dict[str, Any]:
        """执行scan命令"""
        if len(args) < 1:
            return {
                "success": False,
                "message": "scan命令需要表名",
                "syntax": self.shell_commands["scan"].syntax
            }
        
        table_name = args[0].strip("'\"")
        
        if table_name not in self.tables:
            return {
                "success": False,
                "message": f"表 '{table_name}' 不存在"
            }
        
        table_data = self.tables[table_name]["data"]
        
        # 构造扫描结果
        results = []
        for row_key, row_data in sorted(table_data.items()):
            results.append({
                "row_key": row_key,
                "data": row_data
            })
        
        return {
            "success": True,
            "message": f"扫描完成",
            "table_name": table_name,
            "results": results,
            "count": len(results)
        }
    
    def _execute_delete(self, args: List[str]) -> Dict[str, Any]:
        """执行delete命令"""
        if len(args) < 3:
            return {
                "success": False,
                "message": "delete命令需要表名、行键和列名",
                "syntax": self.shell_commands["delete"].syntax
            }
        
        table_name = args[0].strip("'\"")
        row_key = args[1].strip("'\"")
        column = args[2].strip("'\"")
        
        if table_name not in self.tables:
            return {
                "success": False,
                "message": f"表 '{table_name}' 不存在"
            }
        
        if row_key not in self.tables[table_name]["data"]:
            return {
                "success": False,
                "message": f"行 '{row_key}' 不存在"
            }
        
        if column not in self.tables[table_name]["data"][row_key]:
            return {
                "success": False,
                "message": f"列 '{column}' 不存在"
            }
        
        # 删除列
        deleted_value = self.tables[table_name]["data"][row_key].pop(column)
        
        # 如果行为空,删除行
        if not self.tables[table_name]["data"][row_key]:
            del self.tables[table_name]["data"][row_key]
        
        return {
            "success": True,
            "message": f"删除成功",
            "table_name": table_name,
            "row_key": row_key,
            "column": column,
            "deleted_value": deleted_value
        }
    
    def _execute_list(self, args: List[str]) -> Dict[str, Any]:
        """执行list命令"""
        pattern = args[0].strip("'\") if args else None
        
        table_names = list(self.tables.keys())
        
        if pattern:
            # 简单的模式匹配
            import re
            regex_pattern = pattern.replace('*', '.*')
            table_names = [name for name in table_names if re.match(regex_pattern, name)]
        
        return {
            "success": True,
            "message": f"找到 {len(table_names)} 个表",
            "tables": table_names
        }
    
    def _execute_status(self, args: List[str]) -> Dict[str, Any]:
        """执行status命令"""
        format_type = args[0] if args else "simple"
        
        status_info = {
            "cluster_id": "hbase-cluster-001",
            "master": "localhost:16000",
            "version": "2.4.9",
            "region_servers": 1,
            "tables": len(self.tables),
            "regions": sum(len(table["data"]) for table in self.tables.values()),
            "uptime": "2 days, 5 hours, 30 minutes"
        }
        
        if format_type == "detailed":
            status_info["detailed_info"] = {
                "heap_memory": "512MB / 2GB",
                "region_server_list": ["localhost:16020"],
                "table_details": [
                    {"name": name, "regions": len(table["data"])}
                    for name, table in self.tables.items()
                ]
            }
        
        return {
            "success": True,
            "message": "集群状态",
            "status": status_info
        }
    
    def _execute_version(self, args: List[str]) -> Dict[str, Any]:
        """执行version命令"""
        return {
            "success": True,
            "message": "HBase版本信息",
            "version": "2.4.9",
            "revision": "abc123def456",
            "compiled": "2023-12-01 10:30:00",
            "java_version": "1.8.0_301"
        }
    
    def _execute_help(self, args: List[str]) -> Dict[str, Any]:
        """执行help命令"""
        if args:
            command_name = args[0].lower()
            if command_name in self.shell_commands:
                cmd = self.shell_commands[command_name]
                return {
                    "success": True,
                    "message": f"命令 '{command_name}' 帮助信息",
                    "command": cmd.command,
                    "description": cmd.description,
                    "syntax": cmd.syntax,
                    "examples": cmd.examples,
                    "parameters": cmd.parameters
                }
            else:
                return {
                    "success": False,
                    "message": f"未知命令: {command_name}"
                }
        
        # 显示所有命令
        commands_by_type = {}
        for cmd in self.shell_commands.values():
            cmd_type = cmd.command_type.value
            if cmd_type not in commands_by_type:
                commands_by_type[cmd_type] = []
            commands_by_type[cmd_type].append({
                "command": cmd.command,
                "description": cmd.description
            })
        
        return {
            "success": True,
            "message": "HBase Shell 帮助信息",
            "commands": commands_by_type
        }
    
    def get_command_history(self, limit: int = 10) -> List[Dict[str, Any]]:
        """获取命令历史"""
        return self.command_history[-limit:]
    
    def clear_history(self) -> bool:
        """清空命令历史"""
        self.command_history.clear()
        return True

# 使用示例
shell = HBaseShellSimulator()

print("=== HBase Shell 模拟器 ===")

# 执行一系列命令
commands = [
    "version",
    "status",
    "create 'users', 'info', 'stats'",
    "list",
    "describe 'users'",
    "put 'users', 'user001', 'info:name', 'Zhang San'",
    "put 'users', 'user001', 'info:age', '25'",
    "put 'users', 'user001', 'stats:login_count', '100'",
    "get 'users', 'user001'",
    "get 'users', 'user001', 'info:name'",
    "scan 'users'",
    "delete 'users', 'user001', 'info:age'",
    "get 'users', 'user001'"
]

for cmd in commands:
    print(f"\n> {cmd}")
    result = shell.execute_command(cmd)
    
    if result["success"]:
        print(f"✅ {result['message']}")
        if "data" in result:
            print(f"   数据: {result['data']}")
        if "tables" in result:
            print(f"   表列表: {result['tables']}")
        if "results" in result:
            print(f"   扫描结果: {len(result['results'])} 行")
    else:
        print(f"❌ {result['message']}")
    
    print(f"   执行时间: {result['execution_time']}ms")

# 显示命令历史
print("\n=== 命令历史 ===")
history = shell.get_command_history(5)
for i, entry in enumerate(history, 1):
    status_icon = "✅" if entry["status"] == "success" else "❌"
    print(f"{i}. {status_icon} [{entry['timestamp']}] {entry['command']}")

2. Java API操作

2.1 连接管理

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Callable
from datetime import datetime
import json
import threading
import time

class ConnectionState(Enum):
    """连接状态枚举"""
    DISCONNECTED = "disconnected"
    CONNECTING = "connecting"
    CONNECTED = "connected"
    RECONNECTING = "reconnecting"
    FAILED = "failed"

class OperationType(Enum):
    """操作类型枚举"""
    GET = "get"
    PUT = "put"
    DELETE = "delete"
    SCAN = "scan"
    BATCH = "batch"

@dataclass
class ConnectionConfig:
    """连接配置数据类"""
    zookeeper_quorum: str = "localhost:2181"
    zookeeper_znode_parent: str = "/hbase"
    client_timeout: int = 30000  # 30秒
    rpc_timeout: int = 60000  # 60秒
    retries: int = 3
    retry_pause: int = 1000  # 1秒
    max_connections: int = 100
    connection_pool_size: int = 10

@dataclass
class OperationResult:
    """操作结果数据类"""
    success: bool
    operation_type: OperationType
    table_name: str
    row_key: Optional[str] = None
    data: Optional[Dict[str, Any]] = None
    error_message: Optional[str] = None
    execution_time: float = 0.0
    timestamp: str = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]

class HBaseConnection:
    """HBase连接类"""
    
    def __init__(self, config: ConnectionConfig):
        self.config = config
        self.state = ConnectionState.DISCONNECTED
        self.connection_time: Optional[datetime] = None
        self.last_activity: Optional[datetime] = None
        self.operation_count = 0
        self.error_count = 0
        self._lock = threading.Lock()
    
    def connect(self) -> bool:
        """建立连接"""
        with self._lock:
            try:
                self.state = ConnectionState.CONNECTING
                
                # 模拟连接过程
                time.sleep(0.1)  # 模拟连接延迟
                
                # 验证配置
                if not self.config.zookeeper_quorum:
                    raise ValueError("ZooKeeper地址不能为空")
                
                self.state = ConnectionState.CONNECTED
                self.connection_time = datetime.now()
                self.last_activity = datetime.now()
                
                print(f"连接到HBase成功: {self.config.zookeeper_quorum}")
                return True
                
            except Exception as e:
                self.state = ConnectionState.FAILED
                print(f"连接失败: {e}")
                return False
    
    def disconnect(self) -> bool:
        """断开连接"""
        with self._lock:
            try:
                if self.state == ConnectionState.CONNECTED:
                    self.state = ConnectionState.DISCONNECTED
                    print("连接已断开")
                return True
            except Exception as e:
                print(f"断开连接失败: {e}")
                return False
    
    def is_connected(self) -> bool:
        """检查连接状态"""
        return self.state == ConnectionState.CONNECTED
    
    def get_connection_info(self) -> Dict[str, Any]:
        """获取连接信息"""
        uptime = None
        if self.connection_time:
            uptime = (datetime.now() - self.connection_time).total_seconds()
        
        return {
            "state": self.state.value,
            "zookeeper_quorum": self.config.zookeeper_quorum,
            "connection_time": self.connection_time.strftime('%Y-%m-%d %H:%M:%S') if self.connection_time else None,
            "uptime_seconds": uptime,
            "operation_count": self.operation_count,
            "error_count": self.error_count,
            "last_activity": self.last_activity.strftime('%Y-%m-%d %H:%M:%S') if self.last_activity else None
        }
    
    def _update_activity(self):
        """更新活动时间"""
        self.last_activity = datetime.now()
        self.operation_count += 1

class HBaseTable:
    """HBase表操作类"""
    
    def __init__(self, connection: HBaseConnection, table_name: str):
        self.connection = connection
        self.table_name = table_name
        self.operation_history: List[OperationResult] = []
        self._data_store: Dict[str, Dict[str, Any]] = {}  # 模拟数据存储
    
    def put(self, row_key: str, column_family: str, qualifier: str, 
           value: Any, timestamp: Optional[int] = None) -> OperationResult:
        """插入数据"""
        start_time = time.time()
        
        try:
            if not self.connection.is_connected():
                raise Exception("连接未建立")
            
            # 构造列名
            column_name = f"{column_family}:{qualifier}"
            
            # 初始化行数据
            if row_key not in self._data_store:
                self._data_store[row_key] = {}
            
            # 插入数据
            cell_data = {
                "value": value,
                "timestamp": timestamp or int(datetime.now().timestamp() * 1000)
            }
            self._data_store[row_key][column_name] = cell_data
            
            self.connection._update_activity()
            
            result = OperationResult(
                success=True,
                operation_type=OperationType.PUT,
                table_name=self.table_name,
                row_key=row_key,
                data={column_name: cell_data},
                execution_time=round((time.time() - start_time) * 1000, 2)
            )
            
            self.operation_history.append(result)
            return result
            
        except Exception as e:
            self.connection.error_count += 1
            
            result = OperationResult(
                success=False,
                operation_type=OperationType.PUT,
                table_name=self.table_name,
                row_key=row_key,
                error_message=str(e),
                execution_time=round((time.time() - start_time) * 1000, 2)
            )
            
            self.operation_history.append(result)
            return result
    
    def get(self, row_key: str, column_family: Optional[str] = None, 
           qualifier: Optional[str] = None) -> OperationResult:
        """获取数据"""
        start_time = time.time()
        
        try:
            if not self.connection.is_connected():
                raise Exception("连接未建立")
            
            if row_key not in self._data_store:
                result_data = {}
            else:
                row_data = self._data_store[row_key]
                
                if column_family and qualifier:
                    # 获取特定列
                    column_name = f"{column_family}:{qualifier}"
                    result_data = {column_name: row_data.get(column_name)} if column_name in row_data else {}
                elif column_family:
                    # 获取列族下所有列
                    result_data = {k: v for k, v in row_data.items() if k.startswith(f"{column_family}:")}
                else:
                    # 获取整行
                    result_data = row_data.copy()
            
            self.connection._update_activity()
            
            result = OperationResult(
                success=True,
                operation_type=OperationType.GET,
                table_name=self.table_name,
                row_key=row_key,
                data=result_data,
                execution_time=round((time.time() - start_time) * 1000, 2)
            )
            
            self.operation_history.append(result)
            return result
            
        except Exception as e:
            self.connection.error_count += 1
            
            result = OperationResult(
                success=False,
                operation_type=OperationType.GET,
                table_name=self.table_name,
                row_key=row_key,
                error_message=str(e),
                execution_time=round((time.time() - start_time) * 1000, 2)
            )
            
            self.operation_history.append(result)
            return result
    
    def delete(self, row_key: str, column_family: Optional[str] = None, 
              qualifier: Optional[str] = None) -> OperationResult:
        """删除数据"""
        start_time = time.time()
        
        try:
            if not self.connection.is_connected():
                raise Exception("连接未建立")
            
            deleted_data = {}
            
            if row_key in self._data_store:
                if column_family and qualifier:
                    # 删除特定列
                    column_name = f"{column_family}:{qualifier}"
                    if column_name in self._data_store[row_key]:
                        deleted_data[column_name] = self._data_store[row_key].pop(column_name)
                elif column_family:
                    # 删除列族下所有列
                    keys_to_delete = [k for k in self._data_store[row_key].keys() if k.startswith(f"{column_family}:")]
                    for key in keys_to_delete:
                        deleted_data[key] = self._data_store[row_key].pop(key)
                else:
                    # 删除整行
                    deleted_data = self._data_store.pop(row_key, {})
                
                # 如果行为空,删除行
                if row_key in self._data_store and not self._data_store[row_key]:
                    del self._data_store[row_key]
            
            self.connection._update_activity()
            
            result = OperationResult(
                success=True,
                operation_type=OperationType.DELETE,
                table_name=self.table_name,
                row_key=row_key,
                data=deleted_data,
                execution_time=round((time.time() - start_time) * 1000, 2)
            )
            
            self.operation_history.append(result)
            return result
            
        except Exception as e:
            self.connection.error_count += 1
            
            result = OperationResult(
                success=False,
                operation_type=OperationType.DELETE,
                table_name=self.table_name,
                row_key=row_key,
                error_message=str(e),
                execution_time=round((time.time() - start_time) * 1000, 2)
            )
            
            self.operation_history.append(result)
            return result
    
    def scan(self, start_row: Optional[str] = None, end_row: Optional[str] = None, 
            limit: int = 100) -> OperationResult:
        """扫描数据"""
        start_time = time.time()
        
        try:
            if not self.connection.is_connected():
                raise Exception("连接未建立")
            
            # 获取所有行键并排序
            all_row_keys = sorted(self._data_store.keys())
            
            # 应用范围过滤
            if start_row:
                all_row_keys = [rk for rk in all_row_keys if rk >= start_row]
            if end_row:
                all_row_keys = [rk for rk in all_row_keys if rk < end_row]
            
            # 限制结果数量
            all_row_keys = all_row_keys[:limit]
            
            # 构造结果
            scan_results = []
            for row_key in all_row_keys:
                scan_results.append({
                    "row_key": row_key,
                    "data": self._data_store[row_key]
                })
            
            self.connection._update_activity()
            
            result = OperationResult(
                success=True,
                operation_type=OperationType.SCAN,
                table_name=self.table_name,
                data={"results": scan_results, "count": len(scan_results)},
                execution_time=round((time.time() - start_time) * 1000, 2)
            )
            
            self.operation_history.append(result)
            return result
            
        except Exception as e:
            self.connection.error_count += 1
            
            result = OperationResult(
                success=False,
                operation_type=OperationType.SCAN,
                table_name=self.table_name,
                error_message=str(e),
                execution_time=round((time.time() - start_time) * 1000, 2)
            )
            
            self.operation_history.append(result)
            return result
    
    def get_operation_statistics(self) -> Dict[str, Any]:
        """获取操作统计"""
        if not self.operation_history:
            return {"total_operations": 0}
        
        # 按操作类型统计
        type_stats = {}
        total_time = 0
        success_count = 0
        
        for op in self.operation_history:
            op_type = op.operation_type.value
            if op_type not in type_stats:
                type_stats[op_type] = {"count": 0, "success": 0, "total_time": 0}
            
            type_stats[op_type]["count"] += 1
            type_stats[op_type]["total_time"] += op.execution_time
            
            if op.success:
                type_stats[op_type]["success"] += 1
                success_count += 1
            
            total_time += op.execution_time
        
        # 计算平均时间
        for stats in type_stats.values():
            stats["avg_time"] = round(stats["total_time"] / stats["count"], 2)
            stats["success_rate"] = round(stats["success"] / stats["count"] * 100, 2)
        
        return {
            "total_operations": len(self.operation_history),
            "success_count": success_count,
            "success_rate": round(success_count / len(self.operation_history) * 100, 2),
            "total_execution_time": round(total_time, 2),
            "avg_execution_time": round(total_time / len(self.operation_history), 2),
            "operations_by_type": type_stats
        }

class HBaseClient:
    """HBase客户端类"""
    
    def __init__(self, config: ConnectionConfig):
        self.config = config
        self.connection = HBaseConnection(config)
        self.tables: Dict[str, HBaseTable] = {}
    
    def connect(self) -> bool:
        """连接到HBase"""
        return self.connection.connect()
    
    def disconnect(self) -> bool:
        """断开连接"""
        return self.connection.disconnect()
    
    def get_table(self, table_name: str) -> HBaseTable:
        """获取表对象"""
        if table_name not in self.tables:
            self.tables[table_name] = HBaseTable(self.connection, table_name)
        return self.tables[table_name]
    
    def get_client_statistics(self) -> Dict[str, Any]:
        """获取客户端统计信息"""
        connection_info = self.connection.get_connection_info()
        
        table_stats = {}
        for table_name, table in self.tables.items():
            table_stats[table_name] = table.get_operation_statistics()
        
        return {
            "connection": connection_info,
            "tables": table_stats,
            "total_tables_accessed": len(self.tables)
        }

# 使用示例
print("=== HBase Java API 模拟器 ===")

# 创建连接配置
config = ConnectionConfig(
    zookeeper_quorum="localhost:2181",
    client_timeout=30000,
    rpc_timeout=60000,
    retries=3
)

# 创建客户端并连接
client = HBaseClient(config)
connection_success = client.connect()
print(f"连接结果: {connection_success}")

if connection_success:
    # 获取表对象
    users_table = client.get_table("users")
    
    # 执行操作
    print("\n=== 执行数据操作 ===")
    
    # 插入数据
    put_result = users_table.put("user001", "info", "name", "张三")
    print(f"PUT操作: {put_result.success}, 耗时: {put_result.execution_time}ms")
    
    put_result = users_table.put("user001", "info", "age", "25")
    print(f"PUT操作: {put_result.success}, 耗时: {put_result.execution_time}ms")
    
    put_result = users_table.put("user001", "stats", "login_count", "100")
    print(f"PUT操作: {put_result.success}, 耗时: {put_result.execution_time}ms")
    
    # 获取数据
    get_result = users_table.get("user001")
    print(f"\nGET操作: {get_result.success}, 耗时: {get_result.execution_time}ms")
    if get_result.success:
        print(f"获取到数据: {len(get_result.data)} 列")
        for column, cell_data in get_result.data.items():
            print(f"  {column}: {cell_data['value']}")
    
    # 获取特定列
    get_result = users_table.get("user001", "info", "name")
    print(f"\nGET特定列: {get_result.success}, 数据: {get_result.data}")
    
    # 扫描表
    scan_result = users_table.scan()
    print(f"\nSCAN操作: {scan_result.success}, 耗时: {scan_result.execution_time}ms")
    if scan_result.success:
        print(f"扫描到 {scan_result.data['count']} 行数据")
    
    # 删除数据
    delete_result = users_table.delete("user001", "info", "age")
    print(f"\nDELETE操作: {delete_result.success}, 耗时: {delete_result.execution_time}ms")
    
    # 再次获取数据验证删除
    get_result = users_table.get("user001")
    print(f"删除后GET操作: {get_result.success}, 剩余列数: {len(get_result.data)}")
    
    # 获取统计信息
    print("\n=== 操作统计 ===")
    table_stats = users_table.get_operation_statistics()
    print(f"表操作统计: {json.dumps(table_stats, indent=2, ensure_ascii=False)}")
    
    client_stats = client.get_client_statistics()
    print(f"\n客户端统计: {json.dumps(client_stats, indent=2, ensure_ascii=False)}")
    
    # 断开连接
    client.disconnect()

2.2 批量操作

from typing import List, Tuple

class BatchOperation:
    """批量操作类"""
    
    def __init__(self, table: HBaseTable):
        self.table = table
        self.operations: List[Dict[str, Any]] = []
    
    def add_put(self, row_key: str, column_family: str, qualifier: str, value: Any) -> 'BatchOperation':
        """添加PUT操作"""
        self.operations.append({
            "type": "put",
            "row_key": row_key,
            "column_family": column_family,
            "qualifier": qualifier,
            "value": value
        })
        return self
    
    def add_delete(self, row_key: str, column_family: Optional[str] = None, 
                  qualifier: Optional[str] = None) -> 'BatchOperation':
        """添加DELETE操作"""
        self.operations.append({
            "type": "delete",
            "row_key": row_key,
            "column_family": column_family,
            "qualifier": qualifier
        })
        return self
    
    def execute(self) -> List[OperationResult]:
        """执行批量操作"""
        results = []
        start_time = time.time()
        
        try:
            if not self.table.connection.is_connected():
                raise Exception("连接未建立")
            
            for op in self.operations:
                if op["type"] == "put":
                    result = self.table.put(
                        op["row_key"], 
                        op["column_family"], 
                        op["qualifier"], 
                        op["value"]
                    )
                elif op["type"] == "delete":
                    result = self.table.delete(
                        op["row_key"], 
                        op["column_family"], 
                        op["qualifier"]
                    )
                
                results.append(result)
            
            # 创建批量操作结果
            batch_result = OperationResult(
                success=all(r.success for r in results),
                operation_type=OperationType.BATCH,
                table_name=self.table.table_name,
                data={
                    "operations_count": len(self.operations),
                    "success_count": sum(1 for r in results if r.success),
                    "results": [{
                        "operation": i,
                        "success": r.success,
                        "error": r.error_message
                    } for i, r in enumerate(results)]
                },
                execution_time=round((time.time() - start_time) * 1000, 2)
            )
            
            self.table.operation_history.append(batch_result)
            
        except Exception as e:
            batch_result = OperationResult(
                success=False,
                operation_type=OperationType.BATCH,
                table_name=self.table.table_name,
                error_message=str(e),
                execution_time=round((time.time() - start_time) * 1000, 2)
            )
            
            self.table.operation_history.append(batch_result)
        
        return results
    
    def clear(self) -> 'BatchOperation':
        """清空操作列表"""
        self.operations.clear()
        return self

# 批量操作示例
print("\n=== 批量操作示例 ===")

# 创建批量操作
batch = BatchOperation(users_table)

# 添加多个操作
batch.add_put("user002", "info", "name", "李四") \
     .add_put("user002", "info", "age", "30") \
     .add_put("user002", "stats", "login_count", "50") \
     .add_put("user003", "info", "name", "王五") \
     .add_put("user003", "info", "age", "28")

# 执行批量操作
results = batch.execute()
print(f"批量操作完成,共 {len(results)} 个操作")
for i, result in enumerate(results):
    print(f"  操作 {i+1}: {'成功' if result.success else '失败'}")

# 验证批量插入结果
scan_result = users_table.scan()
print(f"\n表中共有 {scan_result.data['count']} 行数据")

2.3 过滤器操作

from abc import ABC, abstractmethod
import re

class FilterType(Enum):
    """过滤器类型枚举"""
    ROW_KEY = "row_key"
    COLUMN_FAMILY = "column_family"
    COLUMN_QUALIFIER = "column_qualifier"
    VALUE = "value"
    TIMESTAMP = "timestamp"

class CompareOperator(Enum):
    """比较操作符枚举"""
    EQUAL = "equal"
    NOT_EQUAL = "not_equal"
    GREATER = "greater"
    GREATER_OR_EQUAL = "greater_or_equal"
    LESS = "less"
    LESS_OR_EQUAL = "less_or_equal"
    REGEX = "regex"
    SUBSTRING = "substring"

class HBaseFilter(ABC):
    """HBase过滤器基类"""
    
    @abstractmethod
    def filter_row(self, row_key: str, row_data: Dict[str, Any]) -> bool:
        """过滤行数据"""
        pass
    
    @abstractmethod
    def get_filter_info(self) -> Dict[str, Any]:
        """获取过滤器信息"""
        pass

class RowKeyFilter(HBaseFilter):
    """行键过滤器"""
    
    def __init__(self, operator: CompareOperator, value: str):
        self.operator = operator
        self.value = value
    
    def filter_row(self, row_key: str, row_data: Dict[str, Any]) -> bool:
        """根据行键过滤"""
        if self.operator == CompareOperator.EQUAL:
            return row_key == self.value
        elif self.operator == CompareOperator.NOT_EQUAL:
            return row_key != self.value
        elif self.operator == CompareOperator.GREATER:
            return row_key > self.value
        elif self.operator == CompareOperator.GREATER_OR_EQUAL:
            return row_key >= self.value
        elif self.operator == CompareOperator.LESS:
            return row_key < self.value
        elif self.operator == CompareOperator.LESS_OR_EQUAL:
            return row_key <= self.value
        elif self.operator == CompareOperator.REGEX:
            return bool(re.match(self.value, row_key))
        elif self.operator == CompareOperator.SUBSTRING:
            return self.value in row_key
        return False
    
    def get_filter_info(self) -> Dict[str, Any]:
        return {
            "type": "RowKeyFilter",
            "operator": self.operator.value,
            "value": self.value
        }

class ValueFilter(HBaseFilter):
    """值过滤器"""
    
    def __init__(self, column_family: str, qualifier: str, 
                 operator: CompareOperator, value: Any):
        self.column_family = column_family
        self.qualifier = qualifier
        self.operator = operator
        self.value = value
        self.column_name = f"{column_family}:{qualifier}"
    
    def filter_row(self, row_key: str, row_data: Dict[str, Any]) -> bool:
        """根据列值过滤"""
        if self.column_name not in row_data:
            return False
        
        cell_value = row_data[self.column_name]["value"]
        
        if self.operator == CompareOperator.EQUAL:
            return str(cell_value) == str(self.value)
        elif self.operator == CompareOperator.NOT_EQUAL:
            return str(cell_value) != str(self.value)
        elif self.operator == CompareOperator.GREATER:
            try:
                return float(cell_value) > float(self.value)
            except ValueError:
                return str(cell_value) > str(self.value)
        elif self.operator == CompareOperator.GREATER_OR_EQUAL:
            try:
                return float(cell_value) >= float(self.value)
            except ValueError:
                return str(cell_value) >= str(self.value)
        elif self.operator == CompareOperator.LESS:
            try:
                return float(cell_value) < float(self.value)
            except ValueError:
                return str(cell_value) < str(self.value)
        elif self.operator == CompareOperator.LESS_OR_EQUAL:
            try:
                return float(cell_value) <= float(self.value)
            except ValueError:
                return str(cell_value) <= str(self.value)
        elif self.operator == CompareOperator.REGEX:
            return bool(re.match(str(self.value), str(cell_value)))
        elif self.operator == CompareOperator.SUBSTRING:
            return str(self.value) in str(cell_value)
        return False
    
    def get_filter_info(self) -> Dict[str, Any]:
        return {
            "type": "ValueFilter",
            "column": self.column_name,
            "operator": self.operator.value,
            "value": self.value
        }

class FilterList(HBaseFilter):
    """过滤器列表"""
    
    def __init__(self, logical_operator: str = "AND"):
        self.logical_operator = logical_operator.upper()  # AND 或 OR
        self.filters: List[HBaseFilter] = []
    
    def add_filter(self, filter_obj: HBaseFilter) -> 'FilterList':
        """添加过滤器"""
        self.filters.append(filter_obj)
        return self
    
    def filter_row(self, row_key: str, row_data: Dict[str, Any]) -> bool:
        """应用过滤器列表"""
        if not self.filters:
            return True
        
        results = [f.filter_row(row_key, row_data) for f in self.filters]
        
        if self.logical_operator == "AND":
            return all(results)
        elif self.logical_operator == "OR":
            return any(results)
        return True
    
    def get_filter_info(self) -> Dict[str, Any]:
        return {
            "type": "FilterList",
            "logical_operator": self.logical_operator,
            "filters": [f.get_filter_info() for f in self.filters]
        }

class FilteredScan:
    """带过滤器的扫描"""
    
    def __init__(self, table: HBaseTable):
        self.table = table
        self.filters: List[HBaseFilter] = []
        self.start_row: Optional[str] = None
        self.end_row: Optional[str] = None
        self.limit: int = 100
    
    def set_filter(self, filter_obj: HBaseFilter) -> 'FilteredScan':
        """设置过滤器"""
        self.filters = [filter_obj]
        return self
    
    def add_filter(self, filter_obj: HBaseFilter) -> 'FilteredScan':
        """添加过滤器"""
        self.filters.append(filter_obj)
        return self
    
    def set_start_row(self, start_row: str) -> 'FilteredScan':
        """设置起始行"""
        self.start_row = start_row
        return self
    
    def set_end_row(self, end_row: str) -> 'FilteredScan':
        """设置结束行"""
        self.end_row = end_row
        return self
    
    def set_limit(self, limit: int) -> 'FilteredScan':
        """设置限制数量"""
        self.limit = limit
        return self
    
    def execute(self) -> OperationResult:
        """执行过滤扫描"""
        start_time = time.time()
        
        try:
            if not self.table.connection.is_connected():
                raise Exception("连接未建立")
            
            # 获取所有行键并排序
            all_row_keys = sorted(self.table._data_store.keys())
            
            # 应用范围过滤
            if self.start_row:
                all_row_keys = [rk for rk in all_row_keys if rk >= self.start_row]
            if self.end_row:
                all_row_keys = [rk for rk in all_row_keys if rk < self.end_row]
            
            # 应用过滤器
            filtered_results = []
            for row_key in all_row_keys:
                row_data = self.table._data_store[row_key]
                
                # 检查所有过滤器
                passes_all_filters = True
                for filter_obj in self.filters:
                    if not filter_obj.filter_row(row_key, row_data):
                        passes_all_filters = False
                        break
                
                if passes_all_filters:
                    filtered_results.append({
                        "row_key": row_key,
                        "data": row_data
                    })
                
                # 检查限制
                if len(filtered_results) >= self.limit:
                    break
            
            self.table.connection._update_activity()
            
            result = OperationResult(
                success=True,
                operation_type=OperationType.SCAN,
                table_name=self.table.table_name,
                data={
                    "results": filtered_results,
                    "count": len(filtered_results),
                    "filters_applied": [f.get_filter_info() for f in self.filters]
                },
                execution_time=round((time.time() - start_time) * 1000, 2)
            )
            
            self.table.operation_history.append(result)
            return result
            
        except Exception as e:
            self.table.connection.error_count += 1
            
            result = OperationResult(
                success=False,
                operation_type=OperationType.SCAN,
                table_name=self.table.table_name,
                error_message=str(e),
                execution_time=round((time.time() - start_time) * 1000, 2)
            )
            
            self.table.operation_history.append(result)
            return result

# 过滤器使用示例
print("\n=== 过滤器操作示例 ===")

# 1. 行键过滤器
row_filter = RowKeyFilter(CompareOperator.REGEX, r"user00[1-2]")
filtered_scan = FilteredScan(users_table).set_filter(row_filter)
result = filtered_scan.execute()
print(f"行键过滤结果: {result.data['count']} 行")
for row in result.data['results']:
    print(f"  {row['row_key']}")

# 2. 值过滤器
value_filter = ValueFilter("info", "age", CompareOperator.GREATER, "25")
filtered_scan = FilteredScan(users_table).set_filter(value_filter)
result = filtered_scan.execute()
print(f"\n年龄大于25的用户: {result.data['count']} 行")
for row in result.data['results']:
    age = row['data'].get('info:age', {}).get('value', 'N/A')
    name = row['data'].get('info:name', {}).get('value', 'N/A')
    print(f"  {row['row_key']}: {name}, 年龄: {age}")

3. 性能优化与最佳实践

3.1 连接池管理

from threading import Lock
from queue import Queue, Empty
import threading

class ConnectionPool:
    """HBase连接池"""
    
    def __init__(self, max_connections: int = 10, 
                 connection_config: Optional[ConnectionConfig] = None):
        self.max_connections = max_connections
        self.connection_config = connection_config or ConnectionConfig()
        self._pool: Queue[HBaseConnection] = Queue(maxsize=max_connections)
        self._lock = Lock()
        self._created_connections = 0
        self._active_connections = 0
        
        # 预创建一些连接
        self._initialize_pool()
    
    def _initialize_pool(self):
        """初始化连接池"""
        initial_size = min(3, self.max_connections)
        for _ in range(initial_size):
            conn = self._create_connection()
            if conn:
                self._pool.put(conn)
    
    def _create_connection(self) -> Optional[HBaseConnection]:
        """创建新连接"""
        try:
            conn = HBaseConnection(self.connection_config)
            conn.connect()
            self._created_connections += 1
            return conn
        except Exception as e:
            print(f"创建连接失败: {e}")
            return None
    
    def get_connection(self, timeout: float = 5.0) -> Optional[HBaseConnection]:
        """获取连接"""
        with self._lock:
            try:
                # 尝试从池中获取连接
                conn = self._pool.get(timeout=timeout)
                
                # 检查连接是否有效
                if conn.is_connected():
                    self._active_connections += 1
                    return conn
                else:
                    # 连接无效,重新创建
                    conn = self._create_connection()
                    if conn:
                        self._active_connections += 1
                        return conn
                        
            except Empty:
                # 池中没有可用连接,尝试创建新连接
                if self._created_connections < self.max_connections:
                    conn = self._create_connection()
                    if conn:
                        self._active_connections += 1
                        return conn
        
        return None
    
    def return_connection(self, conn: HBaseConnection):
        """归还连接"""
        with self._lock:
            if conn and conn.is_connected():
                try:
                    self._pool.put(conn, timeout=1.0)
                    self._active_connections -= 1
                except:
                    # 池已满,关闭连接
                    conn.disconnect()
                    self._created_connections -= 1
            else:
                # 连接无效,减少计数
                if self._active_connections > 0:
                    self._active_connections -= 1
                if self._created_connections > 0:
                    self._created_connections -= 1
    
    def close_all(self):
        """关闭所有连接"""
        with self._lock:
            while not self._pool.empty():
                try:
                    conn = self._pool.get_nowait()
                    conn.disconnect()
                except Empty:
                    break
            
            self._created_connections = 0
            self._active_connections = 0
    
    def get_pool_stats(self) -> Dict[str, Any]:
        """获取连接池统计信息"""
        with self._lock:
            return {
                "max_connections": self.max_connections,
                "created_connections": self._created_connections,
                "active_connections": self._active_connections,
                "available_connections": self._pool.qsize(),
                "pool_utilization": round(self._active_connections / self.max_connections * 100, 2)
            }

class PooledHBaseClient:
    """使用连接池的HBase客户端"""
    
    def __init__(self, connection_pool: ConnectionPool):
        self.pool = connection_pool
    
    def execute_operation(self, operation_func, *args, **kwargs):
        """执行操作"""
        conn = self.pool.get_connection()
        if not conn:
            raise Exception("无法获取数据库连接")
        
        try:
            # 执行操作
            return operation_func(conn, *args, **kwargs)
        finally:
            # 归还连接
            self.pool.return_connection(conn)
    
    def get_table(self, table_name: str) -> 'PooledHBaseTable':
        """获取表对象"""
        return PooledHBaseTable(table_name, self.pool)

class PooledHBaseTable:
    """使用连接池的HBase表"""
    
    def __init__(self, table_name: str, connection_pool: ConnectionPool):
        self.table_name = table_name
        self.pool = connection_pool
    
    def put(self, row_key: str, column_family: str, qualifier: str, value: Any) -> OperationResult:
        """PUT操作"""
        def _put_operation(conn: HBaseConnection, rk, cf, q, v):
            table = HBaseTable(self.table_name, conn)
            return table.put(rk, cf, q, v)
        
        return self.pool.get_connection() and self._execute_with_connection(_put_operation, row_key, column_family, qualifier, value)
    
    def get(self, row_key: str, column_family: Optional[str] = None, 
           qualifier: Optional[str] = None) -> OperationResult:
        """GET操作"""
        def _get_operation(conn: HBaseConnection, rk, cf, q):
            table = HBaseTable(self.table_name, conn)
            return table.get(rk, cf, q)
        
        return self._execute_with_connection(_get_operation, row_key, column_family, qualifier)
    
    def _execute_with_connection(self, operation_func, *args, **kwargs):
        """使用连接执行操作"""
        conn = self.pool.get_connection()
        if not conn:
            raise Exception("无法获取数据库连接")
        
        try:
            return operation_func(conn, *args, **kwargs)
        finally:
            self.pool.return_connection(conn)

# 连接池使用示例
print("\n=== 连接池使用示例 ===")

# 创建连接池
pool = ConnectionPool(max_connections=5)
print(f"连接池创建完成: {pool.get_pool_stats()}")

# 创建池化客户端
pooled_client = PooledHBaseClient(pool)
pooled_table = pooled_client.get_table("users")

# 执行操作
result = pooled_table.put("user004", "info", "name", "赵六")
print(f"PUT操作结果: {'成功' if result.success else '失败'}")

result = pooled_table.get("user004")
print(f"GET操作结果: {'成功' if result.success else '失败'}")

# 查看连接池状态
print(f"连接池状态: {pool.get_pool_stats()}")

# 关闭连接池
pool.close_all()

3.2 批量操作优化

class OptimizedBatchOperation:
    """优化的批量操作"""
    
    def __init__(self, table: HBaseTable, batch_size: int = 100):
        self.table = table
        self.batch_size = batch_size
        self.operations: List[Dict[str, Any]] = []
        self.auto_flush = True
        self.total_operations = 0
        self.successful_operations = 0
    
    def set_auto_flush(self, auto_flush: bool) -> 'OptimizedBatchOperation':
        """设置自动刷新"""
        self.auto_flush = auto_flush
        return self
    
    def add_put(self, row_key: str, column_family: str, qualifier: str, value: Any) -> 'OptimizedBatchOperation':
        """添加PUT操作"""
        self.operations.append({
            "type": "put",
            "row_key": row_key,
            "column_family": column_family,
            "qualifier": qualifier,
            "value": value
        })
        
        if self.auto_flush and len(self.operations) >= self.batch_size:
            self.flush()
        
        return self
    
    def add_delete(self, row_key: str, column_family: Optional[str] = None, 
                  qualifier: Optional[str] = None) -> 'OptimizedBatchOperation':
        """添加DELETE操作"""
        self.operations.append({
            "type": "delete",
            "row_key": row_key,
            "column_family": column_family,
            "qualifier": qualifier
        })
        
        if self.auto_flush and len(self.operations) >= self.batch_size:
            self.flush()
        
        return self
    
    def flush(self) -> List[OperationResult]:
        """刷新操作"""
        if not self.operations:
            return []
        
        results = []
        start_time = time.time()
        
        try:
            # 按行键排序以提高性能
            sorted_operations = sorted(self.operations, key=lambda x: x["row_key"])
            
            for op in sorted_operations:
                if op["type"] == "put":
                    result = self.table.put(
                        op["row_key"], 
                        op["column_family"], 
                        op["qualifier"], 
                        op["value"]
                    )
                elif op["type"] == "delete":
                    result = self.table.delete(
                        op["row_key"], 
                        op["column_family"], 
                        op["qualifier"]
                    )
                
                results.append(result)
                if result.success:
                    self.successful_operations += 1
            
            self.total_operations += len(self.operations)
            
        except Exception as e:
            print(f"批量操作执行失败: {e}")
        
        # 清空操作列表
        self.operations.clear()
        return results
    
    def get_statistics(self) -> Dict[str, Any]:
        """获取统计信息"""
        return {
            "total_operations": self.total_operations,
            "successful_operations": self.successful_operations,
            "success_rate": round(self.successful_operations / max(self.total_operations, 1) * 100, 2),
            "pending_operations": len(self.operations)
        }

# 优化批量操作示例
print("\n=== 优化批量操作示例 ===")

optimized_batch = OptimizedBatchOperation(users_table, batch_size=3)
optimized_batch.set_auto_flush(True)

# 添加操作(会自动刷新)
for i in range(10):
    optimized_batch.add_put(f"batch_user_{i:03d}", "info", "name", f"批量用户{i}")
    optimized_batch.add_put(f"batch_user_{i:03d}", "info", "age", str(20 + i))

# 手动刷新剩余操作
optimized_batch.flush()

# 查看统计信息
stats = optimized_batch.get_statistics()
print(f"批量操作统计: {stats}")

3.3 错误处理与重试机制

import random
from functools import wraps

class RetryConfig:
    """重试配置"""
    
    def __init__(self, max_retries: int = 3, base_delay: float = 1.0, 
                 max_delay: float = 60.0, backoff_factor: float = 2.0):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.backoff_factor = backoff_factor

def retry_on_failure(retry_config: RetryConfig):
    """重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            
            for attempt in range(retry_config.max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exception = e
                    
                    if attempt < retry_config.max_retries:
                        # 计算延迟时间(指数退避 + 随机抖动)
                        delay = min(
                            retry_config.base_delay * (retry_config.backoff_factor ** attempt),
                            retry_config.max_delay
                        )
                        jitter = random.uniform(0, delay * 0.1)  # 10%的随机抖动
                        total_delay = delay + jitter
                        
                        print(f"操作失败,{total_delay:.2f}秒后重试 (尝试 {attempt + 1}/{retry_config.max_retries})")
                        time.sleep(total_delay)
                    else:
                        print(f"操作最终失败,已重试 {retry_config.max_retries} 次")
            
            raise last_exception
        return wrapper
    return decorator

class RobustHBaseClient:
    """具有错误处理和重试机制的HBase客户端"""
    
    def __init__(self, connection_config: ConnectionConfig, retry_config: RetryConfig):
        self.connection_config = connection_config
        self.retry_config = retry_config
        self.connection: Optional[HBaseConnection] = None
        self.error_count = 0
        self.last_error_time: Optional[float] = None
    
    @retry_on_failure(RetryConfig(max_retries=3))
    def connect(self):
        """连接到HBase"""
        if self.connection and self.connection.is_connected():
            return
        
        self.connection = HBaseConnection(self.connection_config)
        self.connection.connect()
        
        if not self.connection.is_connected():
            raise Exception("连接失败")
    
    def _ensure_connected(self):
        """确保连接可用"""
        if not self.connection or not self.connection.is_connected():
            self.connect()
    
    @retry_on_failure(RetryConfig(max_retries=2))
    def get_table(self, table_name: str) -> 'RobustHBaseTable':
        """获取表对象"""
        self._ensure_connected()
        return RobustHBaseTable(table_name, self.connection, self.retry_config)
    
    def get_error_statistics(self) -> Dict[str, Any]:
        """获取错误统计"""
        return {
            "error_count": self.error_count,
            "last_error_time": self.last_error_time,
            "connection_status": self.connection.get_connection_info() if self.connection else None
        }

class RobustHBaseTable:
    """具有错误处理的HBase表"""
    
    def __init__(self, table_name: str, connection: HBaseConnection, retry_config: RetryConfig):
        self.table_name = table_name
        self.connection = connection
        self.retry_config = retry_config
        self._table = HBaseTable(table_name, connection)
    
    @retry_on_failure(RetryConfig(max_retries=2))
    def put(self, row_key: str, column_family: str, qualifier: str, value: Any) -> OperationResult:
        """PUT操作"""
        return self._table.put(row_key, column_family, qualifier, value)
    
    @retry_on_failure(RetryConfig(max_retries=2))
    def get(self, row_key: str, column_family: Optional[str] = None, 
           qualifier: Optional[str] = None) -> OperationResult:
        """GET操作"""
        return self._table.get(row_key, column_family, qualifier)
    
    @retry_on_failure(RetryConfig(max_retries=1))
    def scan(self, start_row: Optional[str] = None, end_row: Optional[str] = None, 
            limit: int = 100) -> OperationResult:
        """SCAN操作"""
        return self._table.scan(start_row, end_row, limit)

# 错误处理示例
print("\n=== 错误处理与重试示例 ===")

retry_config = RetryConfig(max_retries=2, base_delay=0.5)
robust_client = RobustHBaseClient(ConnectionConfig(), retry_config)

try:
    robust_client.connect()
    robust_table = robust_client.get_table("users")
    
    # 执行操作
    result = robust_table.put("robust_user", "info", "name", "健壮用户")
    print(f"PUT操作: {'成功' if result.success else '失败'}")
    
    result = robust_table.get("robust_user")
    print(f"GET操作: {'成功' if result.success else '失败'}")
    
except Exception as e:
    print(f"操作失败: {e}")
    print(f"错误统计: {robust_client.get_error_statistics()}")

4. 总结

4.1 关键要点

  1. HBase Shell基础

    • 掌握基本的DDL和DML命令
    • 理解命令语法和参数
    • 熟练使用过滤器和扫描
  2. Java API操作

    • 连接管理和配置
    • 表操作和数据操作
    • 批量操作和过滤器
  3. 性能优化

    • 使用连接池管理连接
    • 批量操作提高吞吐量
    • 合理设置批次大小
  4. 错误处理

    • 实现重试机制
    • 监控连接状态
    • 记录和分析错误

4.2 最佳实践

  1. 连接管理

    • 使用连接池避免频繁创建连接
    • 及时释放不用的连接
    • 监控连接池状态
  2. 批量操作

    • 合理设置批次大小(通常100-1000)
    • 按行键排序提高性能
    • 使用自动刷新机制
  3. 错误处理

    • 实现指数退避重试
    • 添加随机抖动避免雷群效应
    • 记录详细的错误信息
  4. 性能监控

    • 监控操作延迟
    • 跟踪成功率
    • 分析性能瓶颈

4.3 下一步学习

  • HBase高级操作与客户端API:深入学习协处理器、计数器等高级功能
  • HBase集群管理:了解集群部署、监控和运维
  • HBase性能调优:学习存储优化、查询优化等技术
  • HBase与其他系统集成:探索与Spark、MapReduce等的集成方案

3. 组合过滤器

filter_list = FilterList(“AND”) filter_list.add_filter(RowKeyFilter(CompareOperator.REGEX, r”user.*“))
.add_filter(ValueFilter(“info”, “age”, CompareOperator.LESS_OR_EQUAL, “30”))

filtered_scan = FilteredScan(users_table).set_filter(filter_list) result = filtered_scan.execute() print(f”\n组合过滤结果: {result.data[‘count’]} 行”) for row in result.data[‘results’]: age = row[‘data’].get(‘info:age’, {}).get(‘value’, ‘N/A’) name = row[‘data’].get(‘info:name’, {}).get(‘value’, ‘N/A’) print(f” {row[‘row_key’]}: {name}, 年龄: {age}“)”