概述

本章将深入探讨HBase的高级操作和客户端API,包括批量操作、过滤器、计数器、协处理器等高级功能。这些功能能够帮助开发者构建更高效、更强大的HBase应用程序。

学习目标

  • 掌握HBase批量操作的使用方法
  • 理解各种过滤器的应用场景
  • 学会使用计数器进行原子操作
  • 了解协处理器的工作原理
  • 掌握客户端连接池和性能优化
  • 学习异步操作和并发处理

1. 批量操作

1.1 批量操作基础

from typing import List, Dict, Any, Optional, Union
from enum import Enum
from dataclasses import dataclass
import time
import threading
from concurrent.futures import ThreadPoolExecutor, Future

class BatchOperationType(Enum):
    """批量操作类型枚举"""
    PUT = "put"
    DELETE = "delete"
    INCREMENT = "increment"
    APPEND = "append"
    CHECK_AND_PUT = "check_and_put"
    CHECK_AND_DELETE = "check_and_delete"

class BatchExecutionMode(Enum):
    """批量执行模式枚举"""
    SEQUENTIAL = "sequential"  # 顺序执行
    PARALLEL = "parallel"     # 并行执行
    MIXED = "mixed"           # 混合模式

@dataclass
class BatchOperation:
    """批量操作数据类"""
    operation_type: BatchOperationType
    row_key: str
    column_family: str
    qualifier: str
    value: Any = None
    timestamp: Optional[int] = None
    ttl: Optional[int] = None
    check_family: Optional[str] = None
    check_qualifier: Optional[str] = None
    check_value: Any = None
    operation_id: Optional[str] = None

@dataclass
class BatchResult:
    """批量操作结果数据类"""
    operation_id: str
    success: bool
    operation_type: BatchOperationType
    row_key: str
    execution_time: float
    error_message: Optional[str] = None
    result_data: Optional[Dict[str, Any]] = None

class HBaseBatchClient:
    """HBase批量操作客户端"""
    
    def __init__(self, connection, batch_size: int = 100, 
                 execution_mode: BatchExecutionMode = BatchExecutionMode.SEQUENTIAL):
        self.connection = connection
        self.batch_size = batch_size
        self.execution_mode = execution_mode
        self.operations: List[BatchOperation] = []
        self.results: List[BatchResult] = []
        self.total_operations = 0
        self.successful_operations = 0
        self.failed_operations = 0
        self._lock = threading.Lock()
    
    def add_put(self, row_key: str, column_family: str, qualifier: str, 
               value: Any, timestamp: Optional[int] = None, 
               ttl: Optional[int] = None) -> 'HBaseBatchClient':
        """添加PUT操作"""
        operation = BatchOperation(
            operation_type=BatchOperationType.PUT,
            row_key=row_key,
            column_family=column_family,
            qualifier=qualifier,
            value=value,
            timestamp=timestamp,
            ttl=ttl,
            operation_id=f"put_{len(self.operations)}"
        )
        
        with self._lock:
            self.operations.append(operation)
        
        return self
    
    def add_delete(self, row_key: str, column_family: Optional[str] = None, 
                  qualifier: Optional[str] = None, 
                  timestamp: Optional[int] = None) -> 'HBaseBatchClient':
        """添加DELETE操作"""
        operation = BatchOperation(
            operation_type=BatchOperationType.DELETE,
            row_key=row_key,
            column_family=column_family or "",
            qualifier=qualifier or "",
            timestamp=timestamp,
            operation_id=f"delete_{len(self.operations)}"
        )
        
        with self._lock:
            self.operations.append(operation)
        
        return self
    
    def add_increment(self, row_key: str, column_family: str, qualifier: str, 
                     increment_value: int = 1) -> 'HBaseBatchClient':
        """添加INCREMENT操作"""
        operation = BatchOperation(
            operation_type=BatchOperationType.INCREMENT,
            row_key=row_key,
            column_family=column_family,
            qualifier=qualifier,
            value=increment_value,
            operation_id=f"increment_{len(self.operations)}"
        )
        
        with self._lock:
            self.operations.append(operation)
        
        return self
    
    def add_check_and_put(self, row_key: str, column_family: str, qualifier: str, 
                         value: Any, check_family: str, check_qualifier: str, 
                         check_value: Any) -> 'HBaseBatchClient':
        """添加CHECK_AND_PUT操作"""
        operation = BatchOperation(
            operation_type=BatchOperationType.CHECK_AND_PUT,
            row_key=row_key,
            column_family=column_family,
            qualifier=qualifier,
            value=value,
            check_family=check_family,
            check_qualifier=check_qualifier,
            check_value=check_value,
            operation_id=f"check_and_put_{len(self.operations)}"
        )
        
        with self._lock:
            self.operations.append(operation)
        
        return self
    
    def execute(self, table_name: str) -> List[BatchResult]:
        """执行批量操作"""
        if not self.operations:
            return []
        
        start_time = time.time()
        
        if self.execution_mode == BatchExecutionMode.SEQUENTIAL:
            results = self._execute_sequential(table_name)
        elif self.execution_mode == BatchExecutionMode.PARALLEL:
            results = self._execute_parallel(table_name)
        else:
            results = self._execute_mixed(table_name)
        
        # 更新统计信息
        self.total_operations += len(self.operations)
        self.successful_operations += sum(1 for r in results if r.success)
        self.failed_operations += sum(1 for r in results if not r.success)
        
        # 清空操作列表
        self.operations.clear()
        
        execution_time = time.time() - start_time
        print(f"批量操作完成: {len(results)} 个操作,耗时 {execution_time:.2f} 秒")
        
        return results
    
    def _execute_sequential(self, table_name: str) -> List[BatchResult]:
        """顺序执行批量操作"""
        results = []
        
        for operation in self.operations:
            result = self._execute_single_operation(table_name, operation)
            results.append(result)
        
        return results
    
    def _execute_parallel(self, table_name: str) -> List[BatchResult]:
        """并行执行批量操作"""
        results = []
        max_workers = min(10, len(self.operations))
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = []
            
            for operation in self.operations:
                future = executor.submit(self._execute_single_operation, table_name, operation)
                futures.append(future)
            
            for future in futures:
                try:
                    result = future.result(timeout=30)
                    results.append(result)
                except Exception as e:
                    # 创建失败结果
                    error_result = BatchResult(
                        operation_id="unknown",
                        success=False,
                        operation_type=BatchOperationType.PUT,
                        row_key="unknown",
                        execution_time=0,
                        error_message=str(e)
                    )
                    results.append(error_result)
        
        return results
    
    def _execute_mixed(self, table_name: str) -> List[BatchResult]:
        """混合模式执行批量操作"""
        # 将操作按类型分组
        put_operations = [op for op in self.operations if op.operation_type == BatchOperationType.PUT]
        other_operations = [op for op in self.operations if op.operation_type != BatchOperationType.PUT]
        
        results = []
        
        # PUT操作并行执行
        if put_operations:
            put_results = self._execute_operations_parallel(table_name, put_operations)
            results.extend(put_results)
        
        # 其他操作顺序执行
        if other_operations:
            other_results = self._execute_operations_sequential(table_name, other_operations)
            results.extend(other_results)
        
        return results
    
    def _execute_operations_parallel(self, table_name: str, operations: List[BatchOperation]) -> List[BatchResult]:
        """并行执行指定操作列表"""
        results = []
        max_workers = min(5, len(operations))
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = [executor.submit(self._execute_single_operation, table_name, op) for op in operations]
            
            for future in futures:
                try:
                    result = future.result(timeout=30)
                    results.append(result)
                except Exception as e:
                    error_result = BatchResult(
                        operation_id="unknown",
                        success=False,
                        operation_type=BatchOperationType.PUT,
                        row_key="unknown",
                        execution_time=0,
                        error_message=str(e)
                    )
                    results.append(error_result)
        
        return results
    
    def _execute_operations_sequential(self, table_name: str, operations: List[BatchOperation]) -> List[BatchResult]:
        """顺序执行指定操作列表"""
        return [self._execute_single_operation(table_name, op) for op in operations]
    
    def _execute_single_operation(self, table_name: str, operation: BatchOperation) -> BatchResult:
        """执行单个操作"""
        start_time = time.time()
        
        try:
            if operation.operation_type == BatchOperationType.PUT:
                success = self._execute_put(table_name, operation)
            elif operation.operation_type == BatchOperationType.DELETE:
                success = self._execute_delete(table_name, operation)
            elif operation.operation_type == BatchOperationType.INCREMENT:
                success = self._execute_increment(table_name, operation)
            elif operation.operation_type == BatchOperationType.CHECK_AND_PUT:
                success = self._execute_check_and_put(table_name, operation)
            else:
                success = False
            
            execution_time = time.time() - start_time
            
            return BatchResult(
                operation_id=operation.operation_id,
                success=success,
                operation_type=operation.operation_type,
                row_key=operation.row_key,
                execution_time=execution_time
            )
            
        except Exception as e:
            execution_time = time.time() - start_time
            
            return BatchResult(
                operation_id=operation.operation_id,
                success=False,
                operation_type=operation.operation_type,
                row_key=operation.row_key,
                execution_time=execution_time,
                error_message=str(e)
            )
    
    def _execute_put(self, table_name: str, operation: BatchOperation) -> bool:
        """执行PUT操作"""
        # 模拟PUT操作
        time.sleep(0.01)  # 模拟网络延迟
        return True
    
    def _execute_delete(self, table_name: str, operation: BatchOperation) -> bool:
        """执行DELETE操作"""
        # 模拟DELETE操作
        time.sleep(0.01)  # 模拟网络延迟
        return True
    
    def _execute_increment(self, table_name: str, operation: BatchOperation) -> bool:
        """执行INCREMENT操作"""
        # 模拟INCREMENT操作
        time.sleep(0.02)  # 模拟网络延迟
        return True
    
    def _execute_check_and_put(self, table_name: str, operation: BatchOperation) -> bool:
        """执行CHECK_AND_PUT操作"""
        # 模拟CHECK_AND_PUT操作
        time.sleep(0.03)  # 模拟网络延迟
        return True
    
    def get_statistics(self) -> Dict[str, Any]:
        """获取统计信息"""
        return {
            "total_operations": self.total_operations,
            "successful_operations": self.successful_operations,
            "failed_operations": self.failed_operations,
            "success_rate": round(self.successful_operations / max(self.total_operations, 1) * 100, 2),
            "pending_operations": len(self.operations),
            "execution_mode": self.execution_mode.value,
            "batch_size": self.batch_size
        }
    
    def clear(self) -> 'HBaseBatchClient':
        """清空操作列表"""
        with self._lock:
            self.operations.clear()
        return self

# 批量操作使用示例
print("=== HBase批量操作示例 ===")

# 创建批量客户端
batch_client = HBaseBatchClient(None, batch_size=50, execution_mode=BatchExecutionMode.PARALLEL)

# 添加批量操作
for i in range(100):
    batch_client.add_put(f"user_{i:04d}", "info", "name", f"用户{i}")
    batch_client.add_put(f"user_{i:04d}", "info", "age", str(20 + i % 50))
    batch_client.add_put(f"user_{i:04d}", "stats", "login_count", "0")
    
    if i % 10 == 0:
        batch_client.add_increment(f"user_{i:04d}", "stats", "login_count", 1)

# 执行批量操作
results = batch_client.execute("users")

# 分析结果
successful_results = [r for r in results if r.success]
failed_results = [r for r in results if not r.success]

print(f"批量操作结果:")
print(f"  总操作数: {len(results)}")
print(f"  成功操作: {len(successful_results)}")
print(f"  失败操作: {len(failed_results)}")
print(f"  平均执行时间: {sum(r.execution_time for r in results) / len(results):.4f} 秒")

# 查看统计信息
stats = batch_client.get_statistics()
print(f"\n客户端统计信息: {stats}")

1.2 智能批量操作

class SmartBatchClient:
    """智能批量操作客户端"""
    
    def __init__(self, connection, initial_batch_size: int = 100):
        self.connection = connection
        self.batch_size = initial_batch_size
        self.operations: List[BatchOperation] = []
        self.performance_history: List[Dict[str, Any]] = []
        self.auto_optimize = True
        self.min_batch_size = 10
        self.max_batch_size = 1000
        
    def add_operation(self, operation: BatchOperation) -> 'SmartBatchClient':
        """添加操作"""
        self.operations.append(operation)
        
        # 自动执行检查
        if len(self.operations) >= self.batch_size:
            self.flush()
        
        return self
    
    def flush(self, table_name: str = "default") -> List[BatchResult]:
        """刷新操作"""
        if not self.operations:
            return []
        
        start_time = time.time()
        
        # 执行批量操作
        results = self._execute_batch(table_name)
        
        execution_time = time.time() - start_time
        
        # 记录性能数据
        performance_data = {
            "batch_size": len(self.operations),
            "execution_time": execution_time,
            "throughput": len(self.operations) / execution_time,
            "success_rate": sum(1 for r in results if r.success) / len(results),
            "timestamp": time.time()
        }
        
        self.performance_history.append(performance_data)
        
        # 自动优化批次大小
        if self.auto_optimize:
            self._optimize_batch_size()
        
        # 清空操作列表
        self.operations.clear()
        
        return results
    
    def _execute_batch(self, table_name: str) -> List[BatchResult]:
        """执行批量操作"""
        results = []
        
        # 按操作类型分组优化
        operation_groups = self._group_operations_by_type()
        
        for operation_type, operations in operation_groups.items():
            if operation_type == BatchOperationType.PUT:
                # PUT操作可以并行执行
                group_results = self._execute_parallel_operations(table_name, operations)
            else:
                # 其他操作顺序执行
                group_results = self._execute_sequential_operations(table_name, operations)
            
            results.extend(group_results)
        
        return results
    
    def _group_operations_by_type(self) -> Dict[BatchOperationType, List[BatchOperation]]:
        """按操作类型分组"""
        groups = {}
        
        for operation in self.operations:
            if operation.operation_type not in groups:
                groups[operation.operation_type] = []
            groups[operation.operation_type].append(operation)
        
        return groups
    
    def _execute_parallel_operations(self, table_name: str, operations: List[BatchOperation]) -> List[BatchResult]:
        """并行执行操作"""
        results = []
        max_workers = min(8, len(operations))
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = [executor.submit(self._simulate_operation, table_name, op) for op in operations]
            
            for future in futures:
                try:
                    result = future.result(timeout=10)
                    results.append(result)
                except Exception as e:
                    error_result = BatchResult(
                        operation_id="error",
                        success=False,
                        operation_type=BatchOperationType.PUT,
                        row_key="unknown",
                        execution_time=0,
                        error_message=str(e)
                    )
                    results.append(error_result)
        
        return results
    
    def _execute_sequential_operations(self, table_name: str, operations: List[BatchOperation]) -> List[BatchResult]:
        """顺序执行操作"""
        return [self._simulate_operation(table_name, op) for op in operations]
    
    def _simulate_operation(self, table_name: str, operation: BatchOperation) -> BatchResult:
        """模拟操作执行"""
        start_time = time.time()
        
        # 模拟不同操作的执行时间
        if operation.operation_type == BatchOperationType.PUT:
            time.sleep(0.005)
        elif operation.operation_type == BatchOperationType.DELETE:
            time.sleep(0.003)
        elif operation.operation_type == BatchOperationType.INCREMENT:
            time.sleep(0.008)
        else:
            time.sleep(0.010)
        
        execution_time = time.time() - start_time
        
        # 模拟成功率(95%)
        import random
        success = random.random() > 0.05
        
        return BatchResult(
            operation_id=operation.operation_id,
            success=success,
            operation_type=operation.operation_type,
            row_key=operation.row_key,
            execution_time=execution_time,
            error_message=None if success else "模拟错误"
        )
    
    def _optimize_batch_size(self):
        """优化批次大小"""
        if len(self.performance_history) < 3:
            return
        
        # 获取最近的性能数据
        recent_data = self.performance_history[-3:]
        
        # 计算平均吞吐量
        avg_throughput = sum(d["throughput"] for d in recent_data) / len(recent_data)
        avg_success_rate = sum(d["success_rate"] for d in recent_data) / len(recent_data)
        
        # 根据性能调整批次大小
        if avg_success_rate < 0.9:  # 成功率低,减小批次
            new_batch_size = max(self.min_batch_size, int(self.batch_size * 0.8))
        elif avg_throughput > 1000:  # 吞吐量高,增加批次
            new_batch_size = min(self.max_batch_size, int(self.batch_size * 1.2))
        else:
            new_batch_size = self.batch_size
        
        if new_batch_size != self.batch_size:
            print(f"批次大小优化: {self.batch_size} -> {new_batch_size}")
            self.batch_size = new_batch_size
    
    def get_performance_report(self) -> Dict[str, Any]:
        """获取性能报告"""
        if not self.performance_history:
            return {"message": "暂无性能数据"}
        
        recent_data = self.performance_history[-10:]  # 最近10次
        
        return {
            "current_batch_size": self.batch_size,
            "total_batches": len(self.performance_history),
            "avg_throughput": sum(d["throughput"] for d in recent_data) / len(recent_data),
            "avg_success_rate": sum(d["success_rate"] for d in recent_data) / len(recent_data),
            "avg_execution_time": sum(d["execution_time"] for d in recent_data) / len(recent_data),
            "optimization_enabled": self.auto_optimize
        }

# 智能批量操作示例
print("\n=== 智能批量操作示例 ===")

smart_client = SmartBatchClient(None, initial_batch_size=50)

# 添加大量操作
for i in range(500):
    operation = BatchOperation(
        operation_type=BatchOperationType.PUT,
        row_key=f"smart_user_{i:04d}",
        column_family="info",
        qualifier="name",
        value=f"智能用户{i}",
        operation_id=f"smart_put_{i}"
    )
    smart_client.add_operation(operation)

# 手动刷新剩余操作
smart_client.flush("users")

# 查看性能报告
report = smart_client.get_performance_report()
print(f"性能报告: {report}")

2. 高级过滤器

2.1 复合过滤器

from abc import ABC, abstractmethod
import re
from typing import Callable

class AdvancedFilterType(Enum):
    """高级过滤器类型枚举"""
    SINGLE_COLUMN_VALUE = "single_column_value"
    MULTIPLE_COLUMN_VALUE = "multiple_column_value"
    ROW_KEY_REGEX = "row_key_regex"
    COLUMN_PREFIX = "column_prefix"
    TIMESTAMP_RANGE = "timestamp_range"
    CUSTOM_FUNCTION = "custom_function"
    FUZZY_ROW = "fuzzy_row"
    PAGE_FILTER = "page_filter"

class ComparisonOperator(Enum):
    """比较操作符枚举"""
    EQUAL = "="
    NOT_EQUAL = "!="
    GREATER = ">"
    GREATER_OR_EQUAL = ">="
    LESS = "<"
    LESS_OR_EQUAL = "<="
    REGEX_MATCH = "~"
    SUBSTRING = "contains"
    STARTS_WITH = "starts_with"
    ENDS_WITH = "ends_with"

class LogicalOperator(Enum):
    """逻辑操作符枚举"""
    AND = "AND"
    OR = "OR"
    NOT = "NOT"

class AdvancedFilter(ABC):
    """高级过滤器基类"""
    
    def __init__(self, filter_type: AdvancedFilterType):
        self.filter_type = filter_type
        self.enabled = True
    
    @abstractmethod
    def apply(self, row_key: str, row_data: Dict[str, Any]) -> bool:
        """应用过滤器"""
        pass
    
    @abstractmethod
    def get_description(self) -> str:
        """获取过滤器描述"""
        pass
    
    def enable(self) -> 'AdvancedFilter':
        """启用过滤器"""
        self.enabled = True
        return self
    
    def disable(self) -> 'AdvancedFilter':
        """禁用过滤器"""
        self.enabled = False
        return self

class SingleColumnValueFilter(AdvancedFilter):
    """单列值过滤器"""
    
    def __init__(self, column_family: str, qualifier: str, 
                 operator: ComparisonOperator, value: Any,
                 filter_if_missing: bool = False):
        super().__init__(AdvancedFilterType.SINGLE_COLUMN_VALUE)
        self.column_family = column_family
        self.qualifier = qualifier
        self.operator = operator
        self.value = value
        self.filter_if_missing = filter_if_missing
        self.column_name = f"{column_family}:{qualifier}"
    
    def apply(self, row_key: str, row_data: Dict[str, Any]) -> bool:
        """应用过滤器"""
        if not self.enabled:
            return True
        
        if self.column_name not in row_data:
            return not self.filter_if_missing
        
        cell_value = row_data[self.column_name]["value"]
        
        return self._compare_values(cell_value, self.value, self.operator)
    
    def _compare_values(self, cell_value: Any, filter_value: Any, operator: ComparisonOperator) -> bool:
        """比较值"""
        try:
            if operator == ComparisonOperator.EQUAL:
                return str(cell_value) == str(filter_value)
            elif operator == ComparisonOperator.NOT_EQUAL:
                return str(cell_value) != str(filter_value)
            elif operator == ComparisonOperator.GREATER:
                return float(cell_value) > float(filter_value)
            elif operator == ComparisonOperator.GREATER_OR_EQUAL:
                return float(cell_value) >= float(filter_value)
            elif operator == ComparisonOperator.LESS:
                return float(cell_value) < float(filter_value)
            elif operator == ComparisonOperator.LESS_OR_EQUAL:
                return float(cell_value) <= float(filter_value)
            elif operator == ComparisonOperator.REGEX_MATCH:
                return bool(re.match(str(filter_value), str(cell_value)))
            elif operator == ComparisonOperator.SUBSTRING:
                return str(filter_value) in str(cell_value)
            elif operator == ComparisonOperator.STARTS_WITH:
                return str(cell_value).startswith(str(filter_value))
            elif operator == ComparisonOperator.ENDS_WITH:
                return str(cell_value).endswith(str(filter_value))
        except (ValueError, TypeError):
            return False
        
        return False
    
    def get_description(self) -> str:
        return f"SingleColumnValue({self.column_name} {self.operator.value} {self.value})"

class MultipleColumnValueFilter(AdvancedFilter):
    """多列值过滤器"""
    
    def __init__(self, conditions: List[Dict[str, Any]], logical_operator: LogicalOperator = LogicalOperator.AND):
        super().__init__(AdvancedFilterType.MULTIPLE_COLUMN_VALUE)
        self.conditions = conditions
        self.logical_operator = logical_operator
    
    def apply(self, row_key: str, row_data: Dict[str, Any]) -> bool:
        """应用过滤器"""
        if not self.enabled:
            return True
        
        results = []
        
        for condition in self.conditions:
            column_name = f"{condition['column_family']}:{condition['qualifier']}"
            
            if column_name not in row_data:
                results.append(not condition.get('filter_if_missing', False))
                continue
            
            cell_value = row_data[column_name]["value"]
            operator = ComparisonOperator(condition['operator'])
            filter_value = condition['value']
            
            result = self._compare_values(cell_value, filter_value, operator)
            results.append(result)
        
        if self.logical_operator == LogicalOperator.AND:
            return all(results)
        elif self.logical_operator == LogicalOperator.OR:
            return any(results)
        else:  # NOT
            return not all(results)
    
    def _compare_values(self, cell_value: Any, filter_value: Any, operator: ComparisonOperator) -> bool:
        """比较值(复用SingleColumnValueFilter的逻辑)"""
        temp_filter = SingleColumnValueFilter("", "", operator, filter_value)
        return temp_filter._compare_values(cell_value, filter_value, operator)
    
    def get_description(self) -> str:
        condition_strs = []
        for cond in self.conditions:
            condition_strs.append(f"{cond['column_family']}:{cond['qualifier']} {cond['operator']} {cond['value']}")
        return f"MultipleColumnValue({f' {self.logical_operator.value} '.join(condition_strs)})"

class RowKeyRegexFilter(AdvancedFilter):
    """行键正则表达式过滤器"""
    
    def __init__(self, pattern: str, flags: int = 0):
        super().__init__(AdvancedFilterType.ROW_KEY_REGEX)
        self.pattern = pattern
        self.flags = flags
        self.compiled_pattern = re.compile(pattern, flags)
    
    def apply(self, row_key: str, row_data: Dict[str, Any]) -> bool:
        """应用过滤器"""
        if not self.enabled:
            return True
        
        return bool(self.compiled_pattern.match(row_key))
    
    def get_description(self) -> str:
        return f"RowKeyRegex({self.pattern})"

class ColumnPrefixFilter(AdvancedFilter):
    """列前缀过滤器"""
    
    def __init__(self, prefix: str):
        super().__init__(AdvancedFilterType.COLUMN_PREFIX)
        self.prefix = prefix
    
    def apply(self, row_key: str, row_data: Dict[str, Any]) -> bool:
        """应用过滤器"""
        if not self.enabled:
            return True
        
        # 检查是否有列以指定前缀开头
        for column_name in row_data.keys():
            if column_name.startswith(self.prefix):
                return True
        
        return False
    
    def get_description(self) -> str:
        return f"ColumnPrefix({self.prefix})"

class TimestampRangeFilter(AdvancedFilter):
    """时间戳范围过滤器"""
    
    def __init__(self, min_timestamp: Optional[int] = None, max_timestamp: Optional[int] = None):
        super().__init__(AdvancedFilterType.TIMESTAMP_RANGE)
        self.min_timestamp = min_timestamp
        self.max_timestamp = max_timestamp
    
    def apply(self, row_key: str, row_data: Dict[str, Any]) -> bool:
        """应用过滤器"""
        if not self.enabled:
            return True
        
        # 检查所有列的时间戳
        for column_name, cell_data in row_data.items():
            timestamp = cell_data.get("timestamp", int(time.time() * 1000))
            
            if self.min_timestamp and timestamp < self.min_timestamp:
                continue
            if self.max_timestamp and timestamp > self.max_timestamp:
                continue
            
            return True
        
        return False
    
    def get_description(self) -> str:
        return f"TimestampRange({self.min_timestamp} - {self.max_timestamp})"

class CustomFunctionFilter(AdvancedFilter):
    """自定义函数过滤器"""
    
    def __init__(self, filter_function: Callable[[str, Dict[str, Any]], bool], description: str = "Custom"):
        super().__init__(AdvancedFilterType.CUSTOM_FUNCTION)
        self.filter_function = filter_function
        self.description = description
    
    def apply(self, row_key: str, row_data: Dict[str, Any]) -> bool:
        """应用过滤器"""
        if not self.enabled:
            return True
        
        try:
            return self.filter_function(row_key, row_data)
        except Exception:
            return False
    
    def get_description(self) -> str:
        return f"CustomFunction({self.description})"

class PageFilter(AdvancedFilter):
    """分页过滤器"""
    
    def __init__(self, page_size: int):
        super().__init__(AdvancedFilterType.PAGE_FILTER)
        self.page_size = page_size
        self.current_count = 0
    
    def apply(self, row_key: str, row_data: Dict[str, Any]) -> bool:
        """应用过滤器"""
        if not self.enabled:
            return True
        
        if self.current_count < self.page_size:
            self.current_count += 1
            return True
        
        return False
    
    def reset(self):
        """重置计数器"""
        self.current_count = 0
    
    def get_description(self) -> str:
        return f"Page({self.page_size})"

class FilterChain:
    """过滤器链"""
    
    def __init__(self, logical_operator: LogicalOperator = LogicalOperator.AND):
        self.filters: List[AdvancedFilter] = []
        self.logical_operator = logical_operator
    
    def add_filter(self, filter_obj: AdvancedFilter) -> 'FilterChain':
        """添加过滤器"""
        self.filters.append(filter_obj)
        return self
    
    def remove_filter(self, filter_obj: AdvancedFilter) -> 'FilterChain':
        """移除过滤器"""
        if filter_obj in self.filters:
            self.filters.remove(filter_obj)
        return self
    
    def apply(self, row_key: str, row_data: Dict[str, Any]) -> bool:
        """应用过滤器链"""
        if not self.filters:
            return True
        
        results = [f.apply(row_key, row_data) for f in self.filters if f.enabled]
        
        if not results:
            return True
        
        if self.logical_operator == LogicalOperator.AND:
            return all(results)
        elif self.logical_operator == LogicalOperator.OR:
            return any(results)
        else:  # NOT
            return not all(results)
    
    def get_description(self) -> str:
        """获取过滤器链描述"""
        if not self.filters:
            return "EmptyFilterChain"
        
        descriptions = [f.get_description() for f in self.filters if f.enabled]
        return f"FilterChain({f' {self.logical_operator.value} '.join(descriptions)})"
    
    def get_statistics(self) -> Dict[str, Any]:
        """获取过滤器统计信息"""
        return {
            "total_filters": len(self.filters),
            "enabled_filters": sum(1 for f in self.filters if f.enabled),
            "disabled_filters": sum(1 for f in self.filters if not f.enabled),
            "logical_operator": self.logical_operator.value,
            "filter_types": [f.filter_type.value for f in self.filters]
        }

# 高级过滤器使用示例
print("\n=== 高级过滤器示例 ===")

# 创建测试数据
test_data = {
    "user001": {
        "info:name": {"value": "张三", "timestamp": 1640995200000},
        "info:age": {"value": "25", "timestamp": 1640995200000},
        "stats:login_count": {"value": "100", "timestamp": 1640995200000}
    },
    "user002": {
        "info:name": {"value": "李四", "timestamp": 1640995300000},
        "info:age": {"value": "30", "timestamp": 1640995300000},
        "stats:login_count": {"value": "50", "timestamp": 1640995300000}
    },
    "admin001": {
        "info:name": {"value": "管理员", "timestamp": 1640995400000},
        "info:age": {"value": "35", "timestamp": 1640995400000},
        "stats:login_count": {"value": "200", "timestamp": 1640995400000}
    }
}

# 1. 单列值过滤器
age_filter = SingleColumnValueFilter("info", "age", ComparisonOperator.GREATER, "25")
print(f"年龄大于25的用户:")
for row_key, row_data in test_data.items():
    if age_filter.apply(row_key, row_data):
        name = row_data["info:name"]["value"]
        age = row_data["info:age"]["value"]
        print(f"  {row_key}: {name}, 年龄: {age}")

# 2. 多列值过滤器
conditions = [
    {"column_family": "info", "qualifier": "age", "operator": ">", "value": "20"},
    {"column_family": "stats", "qualifier": "login_count", "operator": ">", "value": "80"}
]
multi_filter = MultipleColumnValueFilter(conditions, LogicalOperator.AND)
print(f"\n年龄大于20且登录次数大于80的用户:")
for row_key, row_data in test_data.items():
    if multi_filter.apply(row_key, row_data):
        name = row_data["info:name"]["value"]
        age = row_data["info:age"]["value"]
        login_count = row_data["stats:login_count"]["value"]
        print(f"  {row_key}: {name}, 年龄: {age}, 登录次数: {login_count}")

# 3. 行键正则表达式过滤器
row_key_filter = RowKeyRegexFilter(r"user\d+")
print(f"\n行键匹配'user\\d+'的记录:")
for row_key, row_data in test_data.items():
    if row_key_filter.apply(row_key, row_data):
        name = row_data["info:name"]["value"]
        print(f"  {row_key}: {name}")

# 4. 自定义函数过滤器
def custom_filter_func(row_key: str, row_data: Dict[str, Any]) -> bool:
    """自定义过滤函数:筛选名字包含'三'或'四'的用户"""
    name = row_data.get("info:name", {}).get("value", "")
    return "三" in name or "四" in name

custom_filter = CustomFunctionFilter(custom_filter_func, "名字包含'三'或'四'")
print(f"\n{custom_filter.get_description()}的用户:")
for row_key, row_data in test_data.items():
    if custom_filter.apply(row_key, row_data):
        name = row_data["info:name"]["value"]
        print(f"  {row_key}: {name}")

# 5. 过滤器链
filter_chain = FilterChain(LogicalOperator.AND)
filter_chain.add_filter(RowKeyRegexFilter(r"user\d+")) \
           .add_filter(SingleColumnValueFilter("info", "age", ComparisonOperator.GREATER_OR_EQUAL, "25"))

print(f"\n过滤器链结果 ({filter_chain.get_description()}):")
for row_key, row_data in test_data.items():
    if filter_chain.apply(row_key, row_data):
        name = row_data["info:name"]["value"]
        age = row_data["info:age"]["value"]
        print(f"  {row_key}: {name}, 年龄: {age}")

# 查看过滤器链统计信息
stats = filter_chain.get_statistics()
print(f"\n过滤器链统计: {stats}")

3. 计数器操作

3.1 原子计数器

class CounterOperationType(Enum):
    """计数器操作类型枚举"""
    INCREMENT = "increment"
    DECREMENT = "decrement"
    SET = "set"
    GET = "get"
    RESET = "reset"
    COMPARE_AND_SET = "compare_and_set"

class CounterDataType(Enum):
    """计数器数据类型枚举"""
    LONG = "long"
    DOUBLE = "double"
    BIG_INTEGER = "big_integer"

@dataclass
class CounterOperation:
    """计数器操作数据类"""
    operation_type: CounterOperationType
    table_name: str
    row_key: str
    column_family: str
    qualifier: str
    value: Union[int, float] = 0
    expected_value: Optional[Union[int, float]] = None
    data_type: CounterDataType = CounterDataType.LONG
    operation_id: Optional[str] = None
    timestamp: Optional[int] = None

@dataclass
class CounterResult:
    """计数器操作结果数据类"""
    operation_id: str
    success: bool
    operation_type: CounterOperationType
    row_key: str
    column_name: str
    old_value: Union[int, float]
    new_value: Union[int, float]
    execution_time: float
    error_message: Optional[str] = None

class HBaseCounter:
    """HBase计数器操作类"""
    
    def __init__(self, connection, table_name: str):
        self.connection = connection
        self.table_name = table_name
        self.counters: Dict[str, Union[int, float]] = {}  # 模拟计数器存储
        self.operation_history: List[CounterResult] = []
        self._lock = threading.Lock()
    
    def increment(self, row_key: str, column_family: str, qualifier: str, 
                 increment_value: Union[int, float] = 1, 
                 data_type: CounterDataType = CounterDataType.LONG) -> CounterResult:
        """增加计数器值"""
        return self._execute_counter_operation(
            CounterOperation(
                operation_type=CounterOperationType.INCREMENT,
                table_name=self.table_name,
                row_key=row_key,
                column_family=column_family,
                qualifier=qualifier,
                value=increment_value,
                data_type=data_type,
                operation_id=f"inc_{len(self.operation_history)}"
            )
        )
    
    def decrement(self, row_key: str, column_family: str, qualifier: str, 
                 decrement_value: Union[int, float] = 1,
                 data_type: CounterDataType = CounterDataType.LONG) -> CounterResult:
        """减少计数器值"""
        return self._execute_counter_operation(
            CounterOperation(
                operation_type=CounterOperationType.DECREMENT,
                table_name=self.table_name,
                row_key=row_key,
                column_family=column_family,
                qualifier=qualifier,
                value=decrement_value,
                data_type=data_type,
                operation_id=f"dec_{len(self.operation_history)}"
            )
        )
    
    def set_counter(self, row_key: str, column_family: str, qualifier: str, 
                   value: Union[int, float],
                   data_type: CounterDataType = CounterDataType.LONG) -> CounterResult:
        """设置计数器值"""
        return self._execute_counter_operation(
            CounterOperation(
                operation_type=CounterOperationType.SET,
                table_name=self.table_name,
                row_key=row_key,
                column_family=column_family,
                qualifier=qualifier,
                value=value,
                data_type=data_type,
                operation_id=f"set_{len(self.operation_history)}"
            )
        )
    
    def get_counter(self, row_key: str, column_family: str, qualifier: str) -> CounterResult:
        """获取计数器值"""
        return self._execute_counter_operation(
            CounterOperation(
                operation_type=CounterOperationType.GET,
                table_name=self.table_name,
                row_key=row_key,
                column_family=column_family,
                qualifier=qualifier,
                operation_id=f"get_{len(self.operation_history)}"
            )
        )
    
    def reset_counter(self, row_key: str, column_family: str, qualifier: str) -> CounterResult:
        """重置计数器值"""
        return self._execute_counter_operation(
            CounterOperation(
                operation_type=CounterOperationType.RESET,
                table_name=self.table_name,
                row_key=row_key,
                column_family=column_family,
                qualifier=qualifier,
                value=0,
                operation_id=f"reset_{len(self.operation_history)}"
            )
        )
    
    def compare_and_set(self, row_key: str, column_family: str, qualifier: str, 
                       expected_value: Union[int, float], new_value: Union[int, float],
                       data_type: CounterDataType = CounterDataType.LONG) -> CounterResult:
        """比较并设置计数器值"""
        return self._execute_counter_operation(
            CounterOperation(
                operation_type=CounterOperationType.COMPARE_AND_SET,
                table_name=self.table_name,
                row_key=row_key,
                column_family=column_family,
                qualifier=qualifier,
                value=new_value,
                expected_value=expected_value,
                data_type=data_type,
                operation_id=f"cas_{len(self.operation_history)}"
            )
        )
    
    def _execute_counter_operation(self, operation: CounterOperation) -> CounterResult:
        """执行计数器操作"""
        start_time = time.time()
        counter_key = f"{operation.row_key}:{operation.column_family}:{operation.qualifier}"
        
        with self._lock:
            try:
                old_value = self.counters.get(counter_key, 0)
                new_value = old_value
                success = True
                error_message = None
                
                if operation.operation_type == CounterOperationType.INCREMENT:
                    new_value = old_value + operation.value
                elif operation.operation_type == CounterOperationType.DECREMENT:
                    new_value = old_value - operation.value
                elif operation.operation_type == CounterOperationType.SET:
                    new_value = operation.value
                elif operation.operation_type == CounterOperationType.GET:
                    new_value = old_value
                elif operation.operation_type == CounterOperationType.RESET:
                    new_value = 0
                elif operation.operation_type == CounterOperationType.COMPARE_AND_SET:
                    if old_value == operation.expected_value:
                        new_value = operation.value
                    else:
                        success = False
                        error_message = f"期望值 {operation.expected_value} 不匹配当前值 {old_value}"
                        new_value = old_value
                
                # 数据类型转换
                if operation.data_type == CounterDataType.LONG:
                    new_value = int(new_value)
                elif operation.data_type == CounterDataType.DOUBLE:
                    new_value = float(new_value)
                
                # 更新计数器值(除了GET操作)
                if success and operation.operation_type != CounterOperationType.GET:
                    self.counters[counter_key] = new_value
                
                execution_time = time.time() - start_time
                
                result = CounterResult(
                    operation_id=operation.operation_id,
                    success=success,
                    operation_type=operation.operation_type,
                    row_key=operation.row_key,
                    column_name=f"{operation.column_family}:{operation.qualifier}",
                    old_value=old_value,
                    new_value=new_value,
                    execution_time=execution_time,
                    error_message=error_message
                )
                
                self.operation_history.append(result)
                return result
                
            except Exception as e:
                execution_time = time.time() - start_time
                
                error_result = CounterResult(
                    operation_id=operation.operation_id,
                    success=False,
                    operation_type=operation.operation_type,
                    row_key=operation.row_key,
                    column_name=f"{operation.column_family}:{operation.qualifier}",
                    old_value=0,
                    new_value=0,
                    execution_time=execution_time,
                    error_message=str(e)
                )
                
                self.operation_history.append(error_result)
                return error_result
    
    def batch_increment(self, operations: List[Dict[str, Any]]) -> List[CounterResult]:
        """批量增加计数器"""
        results = []
        
        for op_data in operations:
            result = self.increment(
                row_key=op_data['row_key'],
                column_family=op_data['column_family'],
                qualifier=op_data['qualifier'],
                increment_value=op_data.get('value', 1),
                data_type=CounterDataType(op_data.get('data_type', 'long'))
            )
            results.append(result)
        
        return results
    
    def get_all_counters(self) -> Dict[str, Union[int, float]]:
        """获取所有计数器值"""
        with self._lock:
            return self.counters.copy()
    
    def get_counter_statistics(self) -> Dict[str, Any]:
        """获取计数器统计信息"""
        successful_ops = [r for r in self.operation_history if r.success]
        failed_ops = [r for r in self.operation_history if not r.success]
        
        return {
            "total_operations": len(self.operation_history),
            "successful_operations": len(successful_ops),
            "failed_operations": len(failed_ops),
            "success_rate": round(len(successful_ops) / max(len(self.operation_history), 1) * 100, 2),
            "total_counters": len(self.counters),
            "avg_execution_time": round(sum(r.execution_time for r in self.operation_history) / max(len(self.operation_history), 1), 4),
            "operation_types": {op_type.value: sum(1 for r in self.operation_history if r.operation_type == op_type) 
                              for op_type in CounterOperationType}
        }
    
    def clear_history(self):
        """清空操作历史"""
        self.operation_history.clear()
    
    def clear_counters(self):
        """清空所有计数器"""
        with self._lock:
            self.counters.clear()

# 计数器使用示例
print("\n=== HBase计数器操作示例 ===")

# 创建计数器客户端
counter_client = HBaseCounter(None, "user_stats")

# 基本计数器操作
print("1. 基本计数器操作:")

# 增加登录次数
result1 = counter_client.increment("user001", "stats", "login_count", 1)
print(f"  增加登录次数: {result1.old_value} -> {result1.new_value}")

# 再次增加
result2 = counter_client.increment("user001", "stats", "login_count", 5)
print(f"  再次增加: {result2.old_value} -> {result2.new_value}")

# 设置页面浏览次数
result3 = counter_client.set_counter("user001", "stats", "page_views", 100)
print(f"  设置页面浏览次数: {result3.old_value} -> {result3.new_value}")

# 获取当前值
result4 = counter_client.get_counter("user001", "stats", "login_count")
print(f"  当前登录次数: {result4.new_value}")

# 比较并设置
result5 = counter_client.compare_and_set("user001", "stats", "login_count", 6, 10)
print(f"  比较并设置 (期望6->10): 成功={result5.success}, {result5.old_value} -> {result5.new_value}")

# 批量操作
print("\n2. 批量计数器操作:")
batch_operations = [
    {"row_key": "user002", "column_family": "stats", "qualifier": "login_count", "value": 3},
    {"row_key": "user002", "column_family": "stats", "qualifier": "page_views", "value": 50},
    {"row_key": "user003", "column_family": "stats", "qualifier": "login_count", "value": 1}
]

batch_results = counter_client.batch_increment(batch_operations)
for result in batch_results:
    print(f"  {result.row_key} {result.column_name}: {result.old_value} -> {result.new_value}")

# 查看所有计数器
print("\n3. 所有计数器值:")
all_counters = counter_client.get_all_counters()
for counter_key, value in all_counters.items():
    print(f"  {counter_key}: {value}")

# 查看统计信息
stats = counter_client.get_counter_statistics()
print(f"\n4. 计数器统计信息: {stats}")

3.2 分布式计数器

class DistributedCounter:
    """分布式计数器"""
    
    def __init__(self, connection, table_name: str, counter_name: str, shard_count: int = 10):
        self.connection = connection
        self.table_name = table_name
        self.counter_name = counter_name
        self.shard_count = shard_count
        self.counters = {}  # 模拟分片计数器
        self._lock = threading.Lock()
    
    def increment(self, increment_value: int = 1) -> int:
        """增加分布式计数器值"""
        import random
        
        # 随机选择一个分片
        shard_id = random.randint(0, self.shard_count - 1)
        shard_key = f"{self.counter_name}_shard_{shard_id}"
        
        with self._lock:
            current_value = self.counters.get(shard_key, 0)
            new_value = current_value + increment_value
            self.counters[shard_key] = new_value
        
        return self.get_total_count()
    
    def get_total_count(self) -> int:
        """获取总计数"""
        with self._lock:
            total = sum(self.counters.values())
        return total
    
    def get_shard_counts(self) -> Dict[str, int]:
        """获取各分片计数"""
        with self._lock:
            return self.counters.copy()
    
    def reset(self):
        """重置计数器"""
        with self._lock:
            self.counters.clear()

# 分布式计数器示例
print("\n=== 分布式计数器示例 ===")

dist_counter = DistributedCounter(None, "global_stats", "total_visits", shard_count=5)

# 模拟并发增加
for i in range(20):
    total = dist_counter.increment(1)
    if i % 5 == 0:
        print(f"第{i+1}次增加后,总计数: {total}")

# 查看分片分布
shard_counts = dist_counter.get_shard_counts()
print(f"\n分片计数分布:")
for shard_key, count in shard_counts.items():
    print(f"  {shard_key}: {count}")

print(f"总计数: {dist_counter.get_total_count()}")

4. 协处理器

4.1 协处理器基础

class CoprocessorType(Enum):
    """协处理器类型枚举"""
    OBSERVER = "observer"      # 观察者协处理器
    ENDPOINT = "endpoint"      # 端点协处理器
    REGION_OBSERVER = "region_observer"  # Region观察者
    MASTER_OBSERVER = "master_observer"  # Master观察者
    WAL_OBSERVER = "wal_observer"        # WAL观察者

class CoprocessorEvent(Enum):
    """协处理器事件枚举"""
    PRE_PUT = "pre_put"
    POST_PUT = "post_put"
    PRE_DELETE = "pre_delete"
    POST_DELETE = "post_delete"
    PRE_GET = "pre_get"
    POST_GET = "post_get"
    PRE_SCAN = "pre_scan"
    POST_SCAN = "post_scan"
    REGION_OPEN = "region_open"
    REGION_CLOSE = "region_close"

@dataclass
class CoprocessorContext:
    """协处理器上下文"""
    table_name: str
    region_name: str
    row_key: str
    column_family: str
    qualifier: str
    value: Any
    timestamp: int
    operation_type: str
    user_info: Dict[str, Any]

class HBaseCoprocessor(ABC):
    """HBase协处理器基类"""
    
    def __init__(self, coprocessor_type: CoprocessorType, name: str):
        self.coprocessor_type = coprocessor_type
        self.name = name
        self.enabled = True
        self.execution_count = 0
        self.error_count = 0
        self.total_execution_time = 0
    
    @abstractmethod
    def process(self, event: CoprocessorEvent, context: CoprocessorContext) -> bool:
        """处理协处理器事件"""
        pass
    
    def get_statistics(self) -> Dict[str, Any]:
        """获取统计信息"""
        return {
            "name": self.name,
            "type": self.coprocessor_type.value,
            "enabled": self.enabled,
            "execution_count": self.execution_count,
            "error_count": self.error_count,
            "success_rate": round((self.execution_count - self.error_count) / max(self.execution_count, 1) * 100, 2),
            "avg_execution_time": round(self.total_execution_time / max(self.execution_count, 1), 4)
        }
    
    def enable(self):
        """启用协处理器"""
        self.enabled = True
    
    def disable(self):
        """禁用协处理器"""
        self.enabled = False

class AuditCoprocessor(HBaseCoprocessor):
    """审计协处理器"""
    
    def __init__(self):
        super().__init__(CoprocessorType.OBSERVER, "AuditCoprocessor")
        self.audit_logs: List[Dict[str, Any]] = []
    
    def process(self, event: CoprocessorEvent, context: CoprocessorContext) -> bool:
        """处理审计事件"""
        if not self.enabled:
            return True
        
        start_time = time.time()
        
        try:
            # 记录审计日志
            audit_entry = {
                "timestamp": int(time.time() * 1000),
                "event": event.value,
                "table_name": context.table_name,
                "row_key": context.row_key,
                "column": f"{context.column_family}:{context.qualifier}",
                "operation_type": context.operation_type,
                "user": context.user_info.get("username", "unknown"),
                "region": context.region_name
            }
            
            self.audit_logs.append(audit_entry)
            
            # 限制日志数量
            if len(self.audit_logs) > 1000:
                self.audit_logs = self.audit_logs[-500:]
            
            self.execution_count += 1
            self.total_execution_time += time.time() - start_time
            
            return True
            
        except Exception as e:
            self.error_count += 1
            print(f"审计协处理器错误: {e}")
            return False
    
    def get_audit_logs(self, limit: int = 100) -> List[Dict[str, Any]]:
        """获取审计日志"""
        return self.audit_logs[-limit:]
    
    def search_audit_logs(self, **filters) -> List[Dict[str, Any]]:
        """搜索审计日志"""
        results = []
        
        for log in self.audit_logs:
            match = True
            
            for key, value in filters.items():
                if key in log and log[key] != value:
                    match = False
                    break
            
            if match:
                results.append(log)
        
        return results

class SecurityCoprocessor(HBaseCoprocessor):
    """安全协处理器"""
    
    def __init__(self):
        super().__init__(CoprocessorType.OBSERVER, "SecurityCoprocessor")
        self.access_rules: Dict[str, Dict[str, Any]] = {}
        self.blocked_operations: List[Dict[str, Any]] = []
    
    def add_access_rule(self, table_name: str, column_family: str, 
                       allowed_users: List[str], allowed_operations: List[str]):
        """添加访问规则"""
        rule_key = f"{table_name}:{column_family}"
        self.access_rules[rule_key] = {
            "allowed_users": allowed_users,
            "allowed_operations": allowed_operations
        }
    
    def process(self, event: CoprocessorEvent, context: CoprocessorContext) -> bool:
        """处理安全检查"""
        if not self.enabled:
            return True
        
        start_time = time.time()
        
        try:
            rule_key = f"{context.table_name}:{context.column_family}"
            
            if rule_key in self.access_rules:
                rule = self.access_rules[rule_key]
                username = context.user_info.get("username", "unknown")
                operation = context.operation_type
                
                # 检查用户权限
                if username not in rule["allowed_users"]:
                    self._block_operation("用户无权限", context)
                    return False
                
                # 检查操作权限
                if operation not in rule["allowed_operations"]:
                    self._block_operation("操作不被允许", context)
                    return False
            
            self.execution_count += 1
            self.total_execution_time += time.time() - start_time
            
            return True
            
        except Exception as e:
            self.error_count += 1
            print(f"安全协处理器错误: {e}")
            return False
    
    def _block_operation(self, reason: str, context: CoprocessorContext):
        """阻止操作"""
        blocked_entry = {
            "timestamp": int(time.time() * 1000),
            "reason": reason,
            "table_name": context.table_name,
            "row_key": context.row_key,
            "column": f"{context.column_family}:{context.qualifier}",
            "operation_type": context.operation_type,
            "user": context.user_info.get("username", "unknown")
        }
        
        self.blocked_operations.append(blocked_entry)
    
    def get_blocked_operations(self, limit: int = 100) -> List[Dict[str, Any]]:
        """获取被阻止的操作"""
        return self.blocked_operations[-limit:]

class DataValidationCoprocessor(HBaseCoprocessor):
    """数据验证协处理器"""
    
    def __init__(self):
        super().__init__(CoprocessorType.OBSERVER, "DataValidationCoprocessor")
        self.validation_rules: Dict[str, Callable] = {}
        self.validation_errors: List[Dict[str, Any]] = []
    
    def add_validation_rule(self, column_pattern: str, validation_func: Callable[[Any], bool]):
        """添加验证规则"""
        self.validation_rules[column_pattern] = validation_func
    
    def process(self, event: CoprocessorEvent, context: CoprocessorContext) -> bool:
        """处理数据验证"""
        if not self.enabled or event not in [CoprocessorEvent.PRE_PUT]:
            return True
        
        start_time = time.time()
        
        try:
            column_name = f"{context.column_family}:{context.qualifier}"
            
            # 应用验证规则
            for pattern, validation_func in self.validation_rules.items():
                if pattern in column_name or pattern == "*":
                    if not validation_func(context.value):
                        self._record_validation_error(f"验证规则 '{pattern}' 失败", context)
                        return False
            
            self.execution_count += 1
            self.total_execution_time += time.time() - start_time
            
            return True
            
        except Exception as e:
            self.error_count += 1
            print(f"数据验证协处理器错误: {e}")
            return False
    
    def _record_validation_error(self, reason: str, context: CoprocessorContext):
        """记录验证错误"""
        error_entry = {
            "timestamp": int(time.time() * 1000),
            "reason": reason,
            "table_name": context.table_name,
            "row_key": context.row_key,
            "column": f"{context.column_family}:{context.qualifier}",
            "value": str(context.value),
            "user": context.user_info.get("username", "unknown")
        }
        
        self.validation_errors.append(error_entry)
    
    def get_validation_errors(self, limit: int = 100) -> List[Dict[str, Any]]:
        """获取验证错误"""
        return self.validation_errors[-limit:]

class CoprocessorManager:
    """协处理器管理器"""
    
    def __init__(self):
        self.coprocessors: List[HBaseCoprocessor] = []
        self.event_handlers: Dict[CoprocessorEvent, List[HBaseCoprocessor]] = {}
    
    def register_coprocessor(self, coprocessor: HBaseCoprocessor, events: List[CoprocessorEvent]):
        """注册协处理器"""
        self.coprocessors.append(coprocessor)
        
        for event in events:
            if event not in self.event_handlers:
                self.event_handlers[event] = []
            self.event_handlers[event].append(coprocessor)
    
    def unregister_coprocessor(self, coprocessor: HBaseCoprocessor):
        """注销协处理器"""
        if coprocessor in self.coprocessors:
            self.coprocessors.remove(coprocessor)
        
        for event_list in self.event_handlers.values():
            if coprocessor in event_list:
                event_list.remove(coprocessor)
    
    def trigger_event(self, event: CoprocessorEvent, context: CoprocessorContext) -> bool:
        """触发事件"""
        if event not in self.event_handlers:
            return True
        
        for coprocessor in self.event_handlers[event]:
            if coprocessor.enabled:
                if not coprocessor.process(event, context):
                    return False  # 如果任何协处理器返回False,停止处理
        
        return True
    
    def get_coprocessor_statistics(self) -> List[Dict[str, Any]]:
        """获取所有协处理器统计信息"""
        return [cp.get_statistics() for cp in self.coprocessors]
    
    def enable_all(self):
        """启用所有协处理器"""
        for coprocessor in self.coprocessors:
            coprocessor.enable()
    
    def disable_all(self):
        """禁用所有协处理器"""
        for coprocessor in self.coprocessors:
            coprocessor.disable()

# 协处理器使用示例
print("\n=== HBase协处理器示例 ===")

# 创建协处理器管理器
cp_manager = CoprocessorManager()

# 创建协处理器实例
audit_cp = AuditCoprocessor()
security_cp = SecurityCoprocessor()
validation_cp = DataValidationCoprocessor()

# 注册协处理器
cp_manager.register_coprocessor(audit_cp, [CoprocessorEvent.PRE_PUT, CoprocessorEvent.POST_PUT, CoprocessorEvent.PRE_DELETE])
cp_manager.register_coprocessor(security_cp, [CoprocessorEvent.PRE_PUT, CoprocessorEvent.PRE_DELETE, CoprocessorEvent.PRE_GET])
cp_manager.register_coprocessor(validation_cp, [CoprocessorEvent.PRE_PUT])

# 配置安全规则
security_cp.add_access_rule("users", "info", ["admin", "user1"], ["put", "get"])
security_cp.add_access_rule("users", "sensitive", ["admin"], ["put", "get", "delete"])

# 配置验证规则
def validate_age(value):
    try:
        age = int(value)
        return 0 <= age <= 150
    except:
        return False

def validate_email(value):
    import re
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    return bool(re.match(pattern, str(value)))

validation_cp.add_validation_rule("age", validate_age)
validation_cp.add_validation_rule("email", validate_email)

# 模拟操作
print("1. 模拟数据操作:")

# 创建测试上下文
contexts = [
    CoprocessorContext(
        table_name="users",
        region_name="region1",
        row_key="user001",
        column_family="info",
        qualifier="age",
        value="25",
        timestamp=int(time.time() * 1000),
        operation_type="put",
        user_info={"username": "admin"}
    ),
    CoprocessorContext(
        table_name="users",
        region_name="region1",
        row_key="user002",
        column_family="info",
        qualifier="email",
        value="invalid-email",
        timestamp=int(time.time() * 1000),
        operation_type="put",
        user_info={"username": "user1"}
    ),
    CoprocessorContext(
        table_name="users",
        region_name="region1",
        row_key="user003",
        column_family="sensitive",
        qualifier="ssn",
        value="123-45-6789",
        timestamp=int(time.time() * 1000),
        operation_type="put",
        user_info={"username": "user1"}  # 无权限用户
    )
]

# 触发事件
for i, context in enumerate(contexts):
    print(f"\n  操作 {i+1}: {context.operation_type} {context.table_name}:{context.column_family}:{context.qualifier}")
    success = cp_manager.trigger_event(CoprocessorEvent.PRE_PUT, context)
    print(f"    结果: {'成功' if success else '失败'}")
    
    if success:
        cp_manager.trigger_event(CoprocessorEvent.POST_PUT, context)

# 查看协处理器统计
print("\n2. 协处理器统计信息:")
stats = cp_manager.get_coprocessor_statistics()
for stat in stats:
    print(f"  {stat['name']}: 执行{stat['execution_count']}次, 成功率{stat['success_rate']}%")

# 查看审计日志
print("\n3. 审计日志:")
audit_logs = audit_cp.get_audit_logs(5)
for log in audit_logs:
    print(f"  {log['timestamp']}: {log['event']} {log['table_name']} by {log['user']}")

# 查看被阻止的操作
print("\n4. 被阻止的操作:")
blocked_ops = security_cp.get_blocked_operations()
for op in blocked_ops:
    print(f"  {op['timestamp']}: {op['reason']} - {op['table_name']}:{op['column']} by {op['user']}")

# 查看验证错误
print("\n5. 验证错误:")
validation_errors = validation_cp.get_validation_errors()
for error in validation_errors:
    print(f"  {error['timestamp']}: {error['reason']} - {error['column']}={error['value']}")

5. 异步操作

5.1 异步客户端

import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import Awaitable, Callable

class AsyncOperationType(Enum):
    """异步操作类型枚举"""
    PUT = "put"
    GET = "get"
    DELETE = "delete"
    SCAN = "scan"
    BATCH = "batch"
    INCREMENT = "increment"

class AsyncOperationStatus(Enum):
    """异步操作状态枚举"""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class AsyncOperation:
    """异步操作数据类"""
    operation_id: str
    operation_type: AsyncOperationType
    table_name: str
    parameters: Dict[str, Any]
    status: AsyncOperationStatus = AsyncOperationStatus.PENDING
    result: Optional[Any] = None
    error: Optional[str] = None
    start_time: Optional[float] = None
    end_time: Optional[float] = None
    callback: Optional[Callable] = None

@dataclass
class AsyncResult:
    """异步操作结果数据类"""
    operation_id: str
    success: bool
    result: Any
    execution_time: float
    error_message: Optional[str] = None

class HBaseAsyncClient:
    """HBase异步客户端"""
    
    def __init__(self, connection, max_workers: int = 10):
        self.connection = connection
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.operations: Dict[str, AsyncOperation] = {}
        self.operation_counter = 0
        self._lock = threading.Lock()
    
    async def put_async(self, table_name: str, row_key: str, 
                       column_family: str, qualifier: str, value: Any,
                       callback: Optional[Callable] = None) -> str:
        """异步PUT操作"""
        operation_id = self._generate_operation_id()
        
        operation = AsyncOperation(
            operation_id=operation_id,
            operation_type=AsyncOperationType.PUT,
            table_name=table_name,
            parameters={
                "row_key": row_key,
                "column_family": column_family,
                "qualifier": qualifier,
                "value": value
            },
            callback=callback
        )
        
        self.operations[operation_id] = operation
        
        # 提交到线程池执行
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(
            self.executor, 
            self._execute_put, 
            operation
        )
        
        # 异步等待结果
        result = await future
        return operation_id
    
    async def get_async(self, table_name: str, row_key: str, 
                       column_family: str, qualifier: str,
                       callback: Optional[Callable] = None) -> str:
        """异步GET操作"""
        operation_id = self._generate_operation_id()
        
        operation = AsyncOperation(
            operation_id=operation_id,
            operation_type=AsyncOperationType.GET,
            table_name=table_name,
            parameters={
                "row_key": row_key,
                "column_family": column_family,
                "qualifier": qualifier
            },
            callback=callback
        )
        
        self.operations[operation_id] = operation
        
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(
            self.executor, 
            self._execute_get, 
            operation
        )
        
        result = await future
        return operation_id
    
    async def batch_async(self, operations: List[Dict[str, Any]],
                         callback: Optional[Callable] = None) -> str:
        """异步批量操作"""
        operation_id = self._generate_operation_id()
        
        operation = AsyncOperation(
            operation_id=operation_id,
            operation_type=AsyncOperationType.BATCH,
            table_name="multiple",
            parameters={"operations": operations},
            callback=callback
        )
        
        self.operations[operation_id] = operation
        
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(
            self.executor, 
            self._execute_batch, 
            operation
        )
        
        result = await future
        return operation_id
    
    def _execute_put(self, operation: AsyncOperation) -> AsyncResult:
        """执行PUT操作"""
        operation.status = AsyncOperationStatus.RUNNING
        operation.start_time = time.time()
        
        try:
            # 模拟PUT操作
            params = operation.parameters
            time.sleep(0.1)  # 模拟网络延迟
            
            result = {
                "row_key": params["row_key"],
                "column": f"{params['column_family']}:{params['qualifier']}",
                "value": params["value"],
                "timestamp": int(time.time() * 1000)
            }
            
            operation.status = AsyncOperationStatus.COMPLETED
            operation.result = result
            operation.end_time = time.time()
            
            async_result = AsyncResult(
                operation_id=operation.operation_id,
                success=True,
                result=result,
                execution_time=operation.end_time - operation.start_time
            )
            
            # 调用回调函数
            if operation.callback:
                operation.callback(async_result)
            
            return async_result
            
        except Exception as e:
            operation.status = AsyncOperationStatus.FAILED
            operation.error = str(e)
            operation.end_time = time.time()
            
            async_result = AsyncResult(
                operation_id=operation.operation_id,
                success=False,
                result=None,
                execution_time=operation.end_time - operation.start_time,
                error_message=str(e)
            )
            
            if operation.callback:
                operation.callback(async_result)
            
            return async_result
    
    def _execute_get(self, operation: AsyncOperation) -> AsyncResult:
        """执行GET操作"""
        operation.status = AsyncOperationStatus.RUNNING
        operation.start_time = time.time()
        
        try:
            # 模拟GET操作
            params = operation.parameters
            time.sleep(0.05)  # 模拟网络延迟
            
            result = {
                "row_key": params["row_key"],
                "column": f"{params['column_family']}:{params['qualifier']}",
                "value": f"value_for_{params['row_key']}",
                "timestamp": int(time.time() * 1000)
            }
            
            operation.status = AsyncOperationStatus.COMPLETED
            operation.result = result
            operation.end_time = time.time()
            
            async_result = AsyncResult(
                operation_id=operation.operation_id,
                success=True,
                result=result,
                execution_time=operation.end_time - operation.start_time
            )
            
            if operation.callback:
                operation.callback(async_result)
            
            return async_result
            
        except Exception as e:
            operation.status = AsyncOperationStatus.FAILED
            operation.error = str(e)
            operation.end_time = time.time()
            
            async_result = AsyncResult(
                operation_id=operation.operation_id,
                success=False,
                result=None,
                execution_time=operation.end_time - operation.start_time,
                error_message=str(e)
            )
            
            if operation.callback:
                operation.callback(async_result)
            
            return async_result
    
    def _execute_batch(self, operation: AsyncOperation) -> AsyncResult:
        """执行批量操作"""
        operation.status = AsyncOperationStatus.RUNNING
        operation.start_time = time.time()
        
        try:
            # 模拟批量操作
            operations = operation.parameters["operations"]
            results = []
            
            for op in operations:
                time.sleep(0.02)  # 模拟每个操作的延迟
                op_result = {
                    "operation_type": op["type"],
                    "row_key": op["row_key"],
                    "success": True
                }
                results.append(op_result)
            
            operation.status = AsyncOperationStatus.COMPLETED
            operation.result = results
            operation.end_time = time.time()
            
            async_result = AsyncResult(
                operation_id=operation.operation_id,
                success=True,
                result=results,
                execution_time=operation.end_time - operation.start_time
            )
            
            if operation.callback:
                operation.callback(async_result)
            
            return async_result
            
        except Exception as e:
            operation.status = AsyncOperationStatus.FAILED
            operation.error = str(e)
            operation.end_time = time.time()
            
            async_result = AsyncResult(
                operation_id=operation.operation_id,
                success=False,
                result=None,
                execution_time=operation.end_time - operation.start_time,
                error_message=str(e)
            )
            
            if operation.callback:
                operation.callback(async_result)
            
            return async_result
    
    def _generate_operation_id(self) -> str:
        """生成操作ID"""
        with self._lock:
            self.operation_counter += 1
            return f"async_op_{self.operation_counter}_{int(time.time() * 1000)}"
    
    def get_operation_status(self, operation_id: str) -> Optional[AsyncOperation]:
        """获取操作状态"""
        return self.operations.get(operation_id)
    
    def cancel_operation(self, operation_id: str) -> bool:
        """取消操作"""
        if operation_id in self.operations:
            operation = self.operations[operation_id]
            if operation.status == AsyncOperationStatus.PENDING:
                operation.status = AsyncOperationStatus.CANCELLED
                return True
        return False
    
    def get_all_operations(self) -> List[AsyncOperation]:
        """获取所有操作"""
        return list(self.operations.values())
    
    def get_statistics(self) -> Dict[str, Any]:
        """获取统计信息"""
        operations = list(self.operations.values())
        
        status_counts = {}
        for status in AsyncOperationStatus:
            status_counts[status.value] = sum(1 for op in operations if op.status == status)
        
        completed_ops = [op for op in operations if op.status == AsyncOperationStatus.COMPLETED]
        avg_execution_time = 0
        if completed_ops:
            avg_execution_time = sum(op.end_time - op.start_time for op in completed_ops) / len(completed_ops)
        
        return {
            "total_operations": len(operations),
            "status_distribution": status_counts,
            "avg_execution_time": round(avg_execution_time, 4),
            "success_rate": round(status_counts.get("completed", 0) / max(len(operations), 1) * 100, 2)
        }
    
    def cleanup_completed_operations(self, max_age_seconds: int = 3600):
        """清理已完成的操作"""
        current_time = time.time()
        to_remove = []
        
        for op_id, operation in self.operations.items():
            if (operation.status in [AsyncOperationStatus.COMPLETED, AsyncOperationStatus.FAILED, AsyncOperationStatus.CANCELLED] and
                operation.end_time and (current_time - operation.end_time) > max_age_seconds):
                to_remove.append(op_id)
        
        for op_id in to_remove:
            del self.operations[op_id]
    
    def close(self):
        """关闭客户端"""
        self.executor.shutdown(wait=True)

# 异步操作使用示例
async def async_example():
    print("\n=== HBase异步操作示例 ===")
    
    # 创建异步客户端
    async_client = HBaseAsyncClient(None, max_workers=5)
    
    # 定义回调函数
    def operation_callback(result: AsyncResult):
        print(f"  回调: 操作 {result.operation_id} {'成功' if result.success else '失败'}, 耗时 {result.execution_time:.4f}s")
    
    print("1. 异步PUT操作:")
    
    # 并发执行多个PUT操作
    put_tasks = []
    for i in range(5):
        task = async_client.put_async(
            table_name="async_test",
            row_key=f"row_{i}",
            column_family="data",
            qualifier="value",
            value=f"async_value_{i}",
            callback=operation_callback
        )
        put_tasks.append(task)
    
    # 等待所有PUT操作完成
    put_operation_ids = await asyncio.gather(*put_tasks)
    print(f"  启动了 {len(put_operation_ids)} 个PUT操作")
    
    # 等待一段时间让操作完成
    await asyncio.sleep(0.5)
    
    print("\n2. 异步GET操作:")
    
    # 并发执行多个GET操作
    get_tasks = []
    for i in range(3):
        task = async_client.get_async(
            table_name="async_test",
            row_key=f"row_{i}",
            column_family="data",
            qualifier="value",
            callback=operation_callback
        )
        get_tasks.append(task)
    
    get_operation_ids = await asyncio.gather(*get_tasks)
    print(f"  启动了 {len(get_operation_ids)} 个GET操作")
    
    await asyncio.sleep(0.3)
    
    print("\n3. 异步批量操作:")
    
    batch_operations = [
        {"type": "put", "row_key": "batch_row_1", "value": "batch_value_1"},
        {"type": "put", "row_key": "batch_row_2", "value": "batch_value_2"},
        {"type": "delete", "row_key": "batch_row_3"}
    ]
    
    batch_operation_id = await async_client.batch_async(
        operations=batch_operations,
        callback=operation_callback
    )
    print(f"  启动批量操作: {batch_operation_id}")
    
    await asyncio.sleep(0.2)
    
    # 查看操作状态
    print("\n4. 操作状态:")
    all_operations = async_client.get_all_operations()
    for operation in all_operations[-5:]:  # 显示最后5个操作
        print(f"  {operation.operation_id}: {operation.operation_type.value} - {operation.status.value}")
    
    # 查看统计信息
    stats = async_client.get_statistics()
    print(f"\n5. 异步客户端统计: {stats}")
    
    # 清理和关闭
    async_client.cleanup_completed_operations()
    async_client.close()

# 运行异步示例
if __name__ == "__main__":
    asyncio.run(async_example())

5.2 异步流处理

class AsyncStreamProcessor:
    """异步流处理器"""
    
    def __init__(self, async_client: HBaseAsyncClient, batch_size: int = 100):
        self.async_client = async_client
        self.batch_size = batch_size
        self.processing_queue = asyncio.Queue()
        self.result_queue = asyncio.Queue()
        self.is_running = False
        self.processed_count = 0
        self.error_count = 0
    
    async def start_processing(self):
        """开始处理"""
        self.is_running = True
        
        # 启动处理任务
        processing_task = asyncio.create_task(self._process_stream())
        result_task = asyncio.create_task(self._handle_results())
        
        await asyncio.gather(processing_task, result_task)
    
    async def add_operation(self, operation_data: Dict[str, Any]):
        """添加操作到队列"""
        await self.processing_queue.put(operation_data)
    
    async def _process_stream(self):
        """处理流数据"""
        batch = []
        
        while self.is_running:
            try:
                # 等待操作数据
                operation_data = await asyncio.wait_for(
                    self.processing_queue.get(), 
                    timeout=1.0
                )
                
                batch.append(operation_data)
                
                # 当批次满了或队列为空时处理批次
                if len(batch) >= self.batch_size or self.processing_queue.empty():
                    if batch:
                        await self._process_batch(batch)
                        batch = []
                
            except asyncio.TimeoutError:
                # 超时时处理当前批次
                if batch:
                    await self._process_batch(batch)
                    batch = []
            except Exception as e:
                print(f"流处理错误: {e}")
                self.error_count += 1
    
    async def _process_batch(self, batch: List[Dict[str, Any]]):
        """处理批次"""
        try:
            operation_id = await self.async_client.batch_async(
                operations=batch,
                callback=self._batch_callback
            )
            
            await self.result_queue.put({
                "operation_id": operation_id,
                "batch_size": len(batch),
                "timestamp": time.time()
            })
            
        except Exception as e:
            print(f"批次处理错误: {e}")
            self.error_count += 1
    
    def _batch_callback(self, result: AsyncResult):
        """批次回调"""
        if result.success:
            self.processed_count += len(result.result)
        else:
            self.error_count += 1
    
    async def _handle_results(self):
        """处理结果"""
        while self.is_running:
            try:
                result = await asyncio.wait_for(
                    self.result_queue.get(), 
                    timeout=1.0
                )
                
                print(f"处理批次 {result['operation_id']}: {result['batch_size']} 个操作")
                
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                print(f"结果处理错误: {e}")
    
    def stop_processing(self):
        """停止处理"""
        self.is_running = False
    
    def get_statistics(self) -> Dict[str, Any]:
        """获取统计信息"""
        return {
            "processed_count": self.processed_count,
            "error_count": self.error_count,
            "queue_size": self.processing_queue.qsize(),
            "is_running": self.is_running
        }

# 流处理示例
async def stream_processing_example():
    print("\n=== 异步流处理示例 ===")
    
    async_client = HBaseAsyncClient(None, max_workers=10)
    stream_processor = AsyncStreamProcessor(async_client, batch_size=5)
    
    # 启动流处理(在后台运行)
    processing_task = asyncio.create_task(stream_processor.start_processing())
    
    # 模拟数据流
    print("1. 模拟数据流:")
    for i in range(20):
        operation_data = {
            "type": "put",
            "row_key": f"stream_row_{i}",
            "value": f"stream_value_{i}"
        }
        await stream_processor.add_operation(operation_data)
        
        if i % 5 == 0:
            print(f"  已添加 {i+1} 个操作到流")
        
        await asyncio.sleep(0.1)  # 模拟数据到达间隔
    
    # 等待处理完成
    await asyncio.sleep(2)
    
    # 查看统计信息
    stats = stream_processor.get_statistics()
    print(f"\n2. 流处理统计: {stats}")
    
    # 停止处理
    stream_processor.stop_processing()
    processing_task.cancel()
    
    async_client.close()

# 运行流处理示例
if __name__ == "__main__":
    asyncio.run(stream_processing_example())

6. 性能监控与调优

6.1 性能监控

class PerformanceMetric(Enum):
    """性能指标枚举"""
    LATENCY = "latency"
    THROUGHPUT = "throughput"
    ERROR_RATE = "error_rate"
    MEMORY_USAGE = "memory_usage"
    CPU_USAGE = "cpu_usage"
    CONNECTION_COUNT = "connection_count"

@dataclass
class MetricData:
    """指标数据"""
    metric_type: PerformanceMetric
    value: float
    timestamp: int
    tags: Dict[str, str] = field(default_factory=dict)

class PerformanceMonitor:
    """性能监控器"""
    
    def __init__(self, window_size: int = 1000):
        self.window_size = window_size
        self.metrics: Dict[PerformanceMetric, List[MetricData]] = {}
        self.operation_times: List[float] = []
        self.error_count = 0
        self.total_operations = 0
        self._lock = threading.Lock()
    
    def record_operation(self, operation_type: str, execution_time: float, success: bool):
        """记录操作"""
        with self._lock:
            self.total_operations += 1
            
            if success:
                self.operation_times.append(execution_time)
                
                # 限制窗口大小
                if len(self.operation_times) > self.window_size:
                    self.operation_times = self.operation_times[-self.window_size:]
            else:
                self.error_count += 1
            
            # 记录延迟指标
            self._add_metric(
                PerformanceMetric.LATENCY,
                execution_time,
                {"operation_type": operation_type, "success": str(success)}
            )
    
    def _add_metric(self, metric_type: PerformanceMetric, value: float, tags: Dict[str, str] = None):
        """添加指标"""
        if metric_type not in self.metrics:
            self.metrics[metric_type] = []
        
        metric_data = MetricData(
            metric_type=metric_type,
            value=value,
            timestamp=int(time.time() * 1000),
            tags=tags or {}
        )
        
        self.metrics[metric_type].append(metric_data)
        
        # 限制指标数量
        if len(self.metrics[metric_type]) > self.window_size:
            self.metrics[metric_type] = self.metrics[metric_type][-self.window_size:]
    
    def get_latency_stats(self) -> Dict[str, float]:
        """获取延迟统计"""
        if not self.operation_times:
            return {"avg": 0, "min": 0, "max": 0, "p50": 0, "p95": 0, "p99": 0}
        
        sorted_times = sorted(self.operation_times)
        n = len(sorted_times)
        
        return {
            "avg": round(sum(sorted_times) / n, 4),
            "min": round(min(sorted_times), 4),
            "max": round(max(sorted_times), 4),
            "p50": round(sorted_times[int(n * 0.5)], 4),
            "p95": round(sorted_times[int(n * 0.95)], 4),
            "p99": round(sorted_times[int(n * 0.99)], 4)
        }
    
    def get_throughput(self, time_window_seconds: int = 60) -> float:
        """获取吞吐量(操作/秒)"""
        current_time = int(time.time() * 1000)
        window_start = current_time - (time_window_seconds * 1000)
        
        recent_operations = 0
        for metric_list in self.metrics.values():
            recent_operations += sum(1 for m in metric_list if m.timestamp >= window_start)
        
        return round(recent_operations / time_window_seconds, 2)
    
    def get_error_rate(self) -> float:
        """获取错误率"""
        if self.total_operations == 0:
            return 0.0
        return round(self.error_count / self.total_operations * 100, 2)
    
    def get_performance_summary(self) -> Dict[str, Any]:
        """获取性能摘要"""
        return {
            "latency_stats": self.get_latency_stats(),
            "throughput_ops_per_sec": self.get_throughput(),
            "error_rate_percent": self.get_error_rate(),
            "total_operations": self.total_operations,
            "error_count": self.error_count
        }
    
    def reset_metrics(self):
        """重置指标"""
        with self._lock:
            self.metrics.clear()
            self.operation_times.clear()
            self.error_count = 0
            self.total_operations = 0

class MonitoredHBaseClient:
    """带监控的HBase客户端"""
    
    def __init__(self, connection):
        self.connection = connection
        self.monitor = PerformanceMonitor()
        self.data_store = {}  # 模拟数据存储
    
    def put(self, table_name: str, row_key: str, column_family: str, qualifier: str, value: Any) -> bool:
        """PUT操作(带监控)"""
        start_time = time.time()
        success = True
        
        try:
            # 模拟PUT操作
            time.sleep(0.01)  # 模拟网络延迟
            
            key = f"{table_name}:{row_key}:{column_family}:{qualifier}"
            self.data_store[key] = {
                "value": value,
                "timestamp": int(time.time() * 1000)
            }
            
        except Exception as e:
            success = False
            print(f"PUT操作失败: {e}")
        
        finally:
            execution_time = time.time() - start_time
            self.monitor.record_operation("put", execution_time, success)
        
        return success
    
    def get(self, table_name: str, row_key: str, column_family: str, qualifier: str) -> Optional[Any]:
        """GET操作(带监控)"""
        start_time = time.time()
        success = True
        result = None
        
        try:
            # 模拟GET操作
            time.sleep(0.005)  # 模拟网络延迟
            
            key = f"{table_name}:{row_key}:{column_family}:{qualifier}"
            if key in self.data_store:
                result = self.data_store[key]
            
        except Exception as e:
            success = False
            print(f"GET操作失败: {e}")
        
        finally:
            execution_time = time.time() - start_time
            self.monitor.record_operation("get", execution_time, success)
        
        return result
    
    def delete(self, table_name: str, row_key: str, column_family: str, qualifier: str) -> bool:
        """DELETE操作(带监控)"""
        start_time = time.time()
        success = True
        
        try:
            # 模拟DELETE操作
            time.sleep(0.008)  # 模拟网络延迟
            
            key = f"{table_name}:{row_key}:{column_family}:{qualifier}"
            if key in self.data_store:
                del self.data_store[key]
            
        except Exception as e:
            success = False
            print(f"DELETE操作失败: {e}")
        
        finally:
            execution_time = time.time() - start_time
            self.monitor.record_operation("delete", execution_time, success)
        
        return success
    
    def get_performance_report(self) -> Dict[str, Any]:
        """获取性能报告"""
        return self.monitor.get_performance_summary()
    
    def reset_monitoring(self):
        """重置监控"""
        self.monitor.reset_metrics()

# 性能监控示例
print("\n=== HBase性能监控示例 ===")

# 创建带监控的客户端
monitored_client = MonitoredHBaseClient(None)

print("1. 执行混合操作:")

# 执行一系列操作
for i in range(100):
    # PUT操作
    monitored_client.put("test_table", f"row_{i}", "data", "value", f"test_value_{i}")
    
    # GET操作
    if i % 3 == 0:
        monitored_client.get("test_table", f"row_{i}", "data", "value")
    
    # DELETE操作
    if i % 10 == 0:
        monitored_client.delete("test_table", f"row_{i-5}", "data", "value")
    
    # 模拟一些失败的操作
    if i % 20 == 0:
        try:
            # 故意引发错误
            raise Exception("模拟错误")
        except:
            monitored_client.monitor.record_operation("put", 0.1, False)

print("  完成100个操作")

# 获取性能报告
print("\n2. 性能报告:")
report = monitored_client.get_performance_report()
for key, value in report.items():
    print(f"  {key}: {value}")

# 获取详细的延迟统计
print("\n3. 详细延迟统计:")
latency_stats = monitored_client.monitor.get_latency_stats()
for percentile, value in latency_stats.items():
    print(f"  {percentile}: {value}s")

print(f"\n4. 当前吞吐量: {monitored_client.monitor.get_throughput()} 操作/秒")
print(f"5. 错误率: {monitored_client.monitor.get_error_rate()}%")

6.2 性能调优建议

class PerformanceTuner:
    """性能调优器"""
    
    def __init__(self, monitor: PerformanceMonitor):
        self.monitor = monitor
    
    def analyze_performance(self) -> Dict[str, Any]:
        """分析性能"""
        summary = self.monitor.get_performance_summary()
        recommendations = []
        
        # 分析延迟
        latency_stats = summary["latency_stats"]
        if latency_stats["avg"] > 0.1:  # 平均延迟超过100ms
            recommendations.append({
                "issue": "高延迟",
                "description": f"平均延迟 {latency_stats['avg']}s 过高",
                "suggestions": [
                    "检查网络连接质量",
                    "增加连接池大小",
                    "考虑使用批量操作",
                    "优化行键设计以减少热点"
                ]
            })
        
        # 分析吞吐量
        throughput = summary["throughput_ops_per_sec"]
        if throughput < 100:  # 吞吐量低于100 ops/sec
            recommendations.append({
                "issue": "低吞吐量",
                "description": f"吞吐量 {throughput} ops/sec 过低",
                "suggestions": [
                    "增加并发连接数",
                    "使用异步操作",
                    "优化批量操作大小",
                    "检查RegionServer负载"
                ]
            })
        
        # 分析错误率
        error_rate = summary["error_rate_percent"]
        if error_rate > 5:  # 错误率超过5%
            recommendations.append({
                "issue": "高错误率",
                "description": f"错误率 {error_rate}% 过高",
                "suggestions": [
                    "检查HBase集群状态",
                    "增加重试机制",
                    "检查网络稳定性",
                    "优化超时设置"
                ]
            })
        
        # 分析延迟分布
        if latency_stats["p99"] > latency_stats["avg"] * 5:
            recommendations.append({
                "issue": "延迟分布不均",
                "description": "P99延迟远高于平均延迟,存在性能抖动",
                "suggestions": [
                    "检查GC设置",
                    "优化RegionServer配置",
                    "检查磁盘I/O性能",
                    "考虑预分区策略"
                ]
            })
        
        return {
            "performance_summary": summary,
            "recommendations": recommendations,
            "overall_health": self._calculate_health_score(summary)
        }
    
    def _calculate_health_score(self, summary: Dict[str, Any]) -> Dict[str, Any]:
        """计算健康评分"""
        score = 100
        
        # 延迟评分
        avg_latency = summary["latency_stats"]["avg"]
        if avg_latency > 0.1:
            score -= 20
        elif avg_latency > 0.05:
            score -= 10
        
        # 吞吐量评分
        throughput = summary["throughput_ops_per_sec"]
        if throughput < 50:
            score -= 30
        elif throughput < 100:
            score -= 15
        
        # 错误率评分
        error_rate = summary["error_rate_percent"]
        if error_rate > 10:
            score -= 40
        elif error_rate > 5:
            score -= 20
        elif error_rate > 1:
            score -= 10
        
        # 确定健康等级
        if score >= 90:
            health_level = "优秀"
        elif score >= 70:
            health_level = "良好"
        elif score >= 50:
            health_level = "一般"
        else:
            health_level = "需要优化"
        
        return {
            "score": max(0, score),
            "level": health_level
        }
    
    def generate_optimization_plan(self, analysis: Dict[str, Any]) -> List[Dict[str, Any]]:
        """生成优化计划"""
        recommendations = analysis["recommendations"]
        health_score = analysis["overall_health"]["score"]
        
        optimization_plan = []
        
        # 根据健康评分确定优化优先级
        if health_score < 50:
            priority = "高"
        elif health_score < 70:
            priority = "中"
        else:
            priority = "低"
        
        for i, rec in enumerate(recommendations):
            plan_item = {
                "priority": priority,
                "order": i + 1,
                "issue": rec["issue"],
                "description": rec["description"],
                "action_items": rec["suggestions"],
                "estimated_impact": self._estimate_impact(rec["issue"]),
                "implementation_difficulty": self._estimate_difficulty(rec["issue"])
            }
            optimization_plan.append(plan_item)
        
        return optimization_plan
    
    def _estimate_impact(self, issue: str) -> str:
        """估算影响程度"""
        impact_map = {
            "高延迟": "中等",
            "低吞吐量": "高",
            "高错误率": "高",
            "延迟分布不均": "中等"
        }
        return impact_map.get(issue, "未知")
    
    def _estimate_difficulty(self, issue: str) -> str:
        """估算实施难度"""
        difficulty_map = {
            "高延迟": "中等",
            "低吞吐量": "中等",
            "高错误率": "低",
            "延迟分布不均": "高"
        }
        return difficulty_map.get(issue, "未知")

# 性能调优示例
print("\n=== 性能调优分析示例 ===")

# 使用之前的监控数据
tuner = PerformanceTuner(monitored_client.monitor)

# 分析性能
analysis = tuner.analyze_performance()

print("1. 性能分析结果:")
print(f"  健康评分: {analysis['overall_health']['score']} ({analysis['overall_health']['level']})")

print("\n2. 发现的问题:")
for rec in analysis["recommendations"]:
    print(f"  问题: {rec['issue']}")
    print(f"  描述: {rec['description']}")
    print(f"  建议: {', '.join(rec['suggestions'])}")
    print()

# 生成优化计划
optimization_plan = tuner.generate_optimization_plan(analysis)

print("3. 优化计划:")
for plan_item in optimization_plan:
    print(f"  优先级: {plan_item['priority']} | 顺序: {plan_item['order']}")
    print(f"  问题: {plan_item['issue']}")
    print(f"  预期影响: {plan_item['estimated_impact']} | 实施难度: {plan_item['implementation_difficulty']}")
    print(f"  行动项: {', '.join(plan_item['action_items'])}")
    print()

总结

本章详细介绍了HBase的高级操作与客户端API,包括:

关键要点

  1. 批量操作:通过批量处理提高操作效率,支持多种执行模式
  2. 高级过滤器:实现复杂的数据筛选和查询逻辑
  3. 计数器操作:提供原子性的计数功能,支持分布式计数
  4. 协处理器:实现服务端数据处理,包括审计、安全和验证
  5. 异步操作:通过异步编程提高并发性能
  6. 性能监控:实时监控系统性能,及时发现问题

最佳实践

  1. 合理使用批量操作:根据数据量和网络条件选择合适的批次大小
  2. 优化过滤器使用:在服务端过滤数据,减少网络传输
  3. 谨慎使用协处理器:避免在协处理器中执行耗时操作
  4. 监控关键指标:持续监控延迟、吞吐量和错误率
  5. 异步处理大量数据:使用异步操作处理高并发场景

下一步学习

  • HBase集群管理与运维
  • HBase与其他大数据组件集成
  • HBase性能调优深入实践
  • HBase故障排除与恢复 “`