概述
本章将深入探讨HBase的高级操作和客户端API,包括批量操作、过滤器、计数器、协处理器等高级功能。这些功能能够帮助开发者构建更高效、更强大的HBase应用程序。
学习目标
- 掌握HBase批量操作的使用方法
- 理解各种过滤器的应用场景
- 学会使用计数器进行原子操作
- 了解协处理器的工作原理
- 掌握客户端连接池和性能优化
- 学习异步操作和并发处理
1. 批量操作
1.1 批量操作基础
from typing import List, Dict, Any, Optional, Union
from enum import Enum
from dataclasses import dataclass
import time
import threading
from concurrent.futures import ThreadPoolExecutor, Future
class BatchOperationType(Enum):
"""批量操作类型枚举"""
PUT = "put"
DELETE = "delete"
INCREMENT = "increment"
APPEND = "append"
CHECK_AND_PUT = "check_and_put"
CHECK_AND_DELETE = "check_and_delete"
class BatchExecutionMode(Enum):
"""批量执行模式枚举"""
SEQUENTIAL = "sequential" # 顺序执行
PARALLEL = "parallel" # 并行执行
MIXED = "mixed" # 混合模式
@dataclass
class BatchOperation:
"""批量操作数据类"""
operation_type: BatchOperationType
row_key: str
column_family: str
qualifier: str
value: Any = None
timestamp: Optional[int] = None
ttl: Optional[int] = None
check_family: Optional[str] = None
check_qualifier: Optional[str] = None
check_value: Any = None
operation_id: Optional[str] = None
@dataclass
class BatchResult:
"""批量操作结果数据类"""
operation_id: str
success: bool
operation_type: BatchOperationType
row_key: str
execution_time: float
error_message: Optional[str] = None
result_data: Optional[Dict[str, Any]] = None
class HBaseBatchClient:
"""HBase批量操作客户端"""
def __init__(self, connection, batch_size: int = 100,
execution_mode: BatchExecutionMode = BatchExecutionMode.SEQUENTIAL):
self.connection = connection
self.batch_size = batch_size
self.execution_mode = execution_mode
self.operations: List[BatchOperation] = []
self.results: List[BatchResult] = []
self.total_operations = 0
self.successful_operations = 0
self.failed_operations = 0
self._lock = threading.Lock()
def add_put(self, row_key: str, column_family: str, qualifier: str,
value: Any, timestamp: Optional[int] = None,
ttl: Optional[int] = None) -> 'HBaseBatchClient':
"""添加PUT操作"""
operation = BatchOperation(
operation_type=BatchOperationType.PUT,
row_key=row_key,
column_family=column_family,
qualifier=qualifier,
value=value,
timestamp=timestamp,
ttl=ttl,
operation_id=f"put_{len(self.operations)}"
)
with self._lock:
self.operations.append(operation)
return self
def add_delete(self, row_key: str, column_family: Optional[str] = None,
qualifier: Optional[str] = None,
timestamp: Optional[int] = None) -> 'HBaseBatchClient':
"""添加DELETE操作"""
operation = BatchOperation(
operation_type=BatchOperationType.DELETE,
row_key=row_key,
column_family=column_family or "",
qualifier=qualifier or "",
timestamp=timestamp,
operation_id=f"delete_{len(self.operations)}"
)
with self._lock:
self.operations.append(operation)
return self
def add_increment(self, row_key: str, column_family: str, qualifier: str,
increment_value: int = 1) -> 'HBaseBatchClient':
"""添加INCREMENT操作"""
operation = BatchOperation(
operation_type=BatchOperationType.INCREMENT,
row_key=row_key,
column_family=column_family,
qualifier=qualifier,
value=increment_value,
operation_id=f"increment_{len(self.operations)}"
)
with self._lock:
self.operations.append(operation)
return self
def add_check_and_put(self, row_key: str, column_family: str, qualifier: str,
value: Any, check_family: str, check_qualifier: str,
check_value: Any) -> 'HBaseBatchClient':
"""添加CHECK_AND_PUT操作"""
operation = BatchOperation(
operation_type=BatchOperationType.CHECK_AND_PUT,
row_key=row_key,
column_family=column_family,
qualifier=qualifier,
value=value,
check_family=check_family,
check_qualifier=check_qualifier,
check_value=check_value,
operation_id=f"check_and_put_{len(self.operations)}"
)
with self._lock:
self.operations.append(operation)
return self
def execute(self, table_name: str) -> List[BatchResult]:
"""执行批量操作"""
if not self.operations:
return []
start_time = time.time()
if self.execution_mode == BatchExecutionMode.SEQUENTIAL:
results = self._execute_sequential(table_name)
elif self.execution_mode == BatchExecutionMode.PARALLEL:
results = self._execute_parallel(table_name)
else:
results = self._execute_mixed(table_name)
# 更新统计信息
self.total_operations += len(self.operations)
self.successful_operations += sum(1 for r in results if r.success)
self.failed_operations += sum(1 for r in results if not r.success)
# 清空操作列表
self.operations.clear()
execution_time = time.time() - start_time
print(f"批量操作完成: {len(results)} 个操作,耗时 {execution_time:.2f} 秒")
return results
def _execute_sequential(self, table_name: str) -> List[BatchResult]:
"""顺序执行批量操作"""
results = []
for operation in self.operations:
result = self._execute_single_operation(table_name, operation)
results.append(result)
return results
def _execute_parallel(self, table_name: str) -> List[BatchResult]:
"""并行执行批量操作"""
results = []
max_workers = min(10, len(self.operations))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for operation in self.operations:
future = executor.submit(self._execute_single_operation, table_name, operation)
futures.append(future)
for future in futures:
try:
result = future.result(timeout=30)
results.append(result)
except Exception as e:
# 创建失败结果
error_result = BatchResult(
operation_id="unknown",
success=False,
operation_type=BatchOperationType.PUT,
row_key="unknown",
execution_time=0,
error_message=str(e)
)
results.append(error_result)
return results
def _execute_mixed(self, table_name: str) -> List[BatchResult]:
"""混合模式执行批量操作"""
# 将操作按类型分组
put_operations = [op for op in self.operations if op.operation_type == BatchOperationType.PUT]
other_operations = [op for op in self.operations if op.operation_type != BatchOperationType.PUT]
results = []
# PUT操作并行执行
if put_operations:
put_results = self._execute_operations_parallel(table_name, put_operations)
results.extend(put_results)
# 其他操作顺序执行
if other_operations:
other_results = self._execute_operations_sequential(table_name, other_operations)
results.extend(other_results)
return results
def _execute_operations_parallel(self, table_name: str, operations: List[BatchOperation]) -> List[BatchResult]:
"""并行执行指定操作列表"""
results = []
max_workers = min(5, len(operations))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(self._execute_single_operation, table_name, op) for op in operations]
for future in futures:
try:
result = future.result(timeout=30)
results.append(result)
except Exception as e:
error_result = BatchResult(
operation_id="unknown",
success=False,
operation_type=BatchOperationType.PUT,
row_key="unknown",
execution_time=0,
error_message=str(e)
)
results.append(error_result)
return results
def _execute_operations_sequential(self, table_name: str, operations: List[BatchOperation]) -> List[BatchResult]:
"""顺序执行指定操作列表"""
return [self._execute_single_operation(table_name, op) for op in operations]
def _execute_single_operation(self, table_name: str, operation: BatchOperation) -> BatchResult:
"""执行单个操作"""
start_time = time.time()
try:
if operation.operation_type == BatchOperationType.PUT:
success = self._execute_put(table_name, operation)
elif operation.operation_type == BatchOperationType.DELETE:
success = self._execute_delete(table_name, operation)
elif operation.operation_type == BatchOperationType.INCREMENT:
success = self._execute_increment(table_name, operation)
elif operation.operation_type == BatchOperationType.CHECK_AND_PUT:
success = self._execute_check_and_put(table_name, operation)
else:
success = False
execution_time = time.time() - start_time
return BatchResult(
operation_id=operation.operation_id,
success=success,
operation_type=operation.operation_type,
row_key=operation.row_key,
execution_time=execution_time
)
except Exception as e:
execution_time = time.time() - start_time
return BatchResult(
operation_id=operation.operation_id,
success=False,
operation_type=operation.operation_type,
row_key=operation.row_key,
execution_time=execution_time,
error_message=str(e)
)
def _execute_put(self, table_name: str, operation: BatchOperation) -> bool:
"""执行PUT操作"""
# 模拟PUT操作
time.sleep(0.01) # 模拟网络延迟
return True
def _execute_delete(self, table_name: str, operation: BatchOperation) -> bool:
"""执行DELETE操作"""
# 模拟DELETE操作
time.sleep(0.01) # 模拟网络延迟
return True
def _execute_increment(self, table_name: str, operation: BatchOperation) -> bool:
"""执行INCREMENT操作"""
# 模拟INCREMENT操作
time.sleep(0.02) # 模拟网络延迟
return True
def _execute_check_and_put(self, table_name: str, operation: BatchOperation) -> bool:
"""执行CHECK_AND_PUT操作"""
# 模拟CHECK_AND_PUT操作
time.sleep(0.03) # 模拟网络延迟
return True
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
"total_operations": self.total_operations,
"successful_operations": self.successful_operations,
"failed_operations": self.failed_operations,
"success_rate": round(self.successful_operations / max(self.total_operations, 1) * 100, 2),
"pending_operations": len(self.operations),
"execution_mode": self.execution_mode.value,
"batch_size": self.batch_size
}
def clear(self) -> 'HBaseBatchClient':
"""清空操作列表"""
with self._lock:
self.operations.clear()
return self
# 批量操作使用示例
print("=== HBase批量操作示例 ===")
# 创建批量客户端
batch_client = HBaseBatchClient(None, batch_size=50, execution_mode=BatchExecutionMode.PARALLEL)
# 添加批量操作
for i in range(100):
batch_client.add_put(f"user_{i:04d}", "info", "name", f"用户{i}")
batch_client.add_put(f"user_{i:04d}", "info", "age", str(20 + i % 50))
batch_client.add_put(f"user_{i:04d}", "stats", "login_count", "0")
if i % 10 == 0:
batch_client.add_increment(f"user_{i:04d}", "stats", "login_count", 1)
# 执行批量操作
results = batch_client.execute("users")
# 分析结果
successful_results = [r for r in results if r.success]
failed_results = [r for r in results if not r.success]
print(f"批量操作结果:")
print(f" 总操作数: {len(results)}")
print(f" 成功操作: {len(successful_results)}")
print(f" 失败操作: {len(failed_results)}")
print(f" 平均执行时间: {sum(r.execution_time for r in results) / len(results):.4f} 秒")
# 查看统计信息
stats = batch_client.get_statistics()
print(f"\n客户端统计信息: {stats}")
1.2 智能批量操作
class SmartBatchClient:
"""智能批量操作客户端"""
def __init__(self, connection, initial_batch_size: int = 100):
self.connection = connection
self.batch_size = initial_batch_size
self.operations: List[BatchOperation] = []
self.performance_history: List[Dict[str, Any]] = []
self.auto_optimize = True
self.min_batch_size = 10
self.max_batch_size = 1000
def add_operation(self, operation: BatchOperation) -> 'SmartBatchClient':
"""添加操作"""
self.operations.append(operation)
# 自动执行检查
if len(self.operations) >= self.batch_size:
self.flush()
return self
def flush(self, table_name: str = "default") -> List[BatchResult]:
"""刷新操作"""
if not self.operations:
return []
start_time = time.time()
# 执行批量操作
results = self._execute_batch(table_name)
execution_time = time.time() - start_time
# 记录性能数据
performance_data = {
"batch_size": len(self.operations),
"execution_time": execution_time,
"throughput": len(self.operations) / execution_time,
"success_rate": sum(1 for r in results if r.success) / len(results),
"timestamp": time.time()
}
self.performance_history.append(performance_data)
# 自动优化批次大小
if self.auto_optimize:
self._optimize_batch_size()
# 清空操作列表
self.operations.clear()
return results
def _execute_batch(self, table_name: str) -> List[BatchResult]:
"""执行批量操作"""
results = []
# 按操作类型分组优化
operation_groups = self._group_operations_by_type()
for operation_type, operations in operation_groups.items():
if operation_type == BatchOperationType.PUT:
# PUT操作可以并行执行
group_results = self._execute_parallel_operations(table_name, operations)
else:
# 其他操作顺序执行
group_results = self._execute_sequential_operations(table_name, operations)
results.extend(group_results)
return results
def _group_operations_by_type(self) -> Dict[BatchOperationType, List[BatchOperation]]:
"""按操作类型分组"""
groups = {}
for operation in self.operations:
if operation.operation_type not in groups:
groups[operation.operation_type] = []
groups[operation.operation_type].append(operation)
return groups
def _execute_parallel_operations(self, table_name: str, operations: List[BatchOperation]) -> List[BatchResult]:
"""并行执行操作"""
results = []
max_workers = min(8, len(operations))
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(self._simulate_operation, table_name, op) for op in operations]
for future in futures:
try:
result = future.result(timeout=10)
results.append(result)
except Exception as e:
error_result = BatchResult(
operation_id="error",
success=False,
operation_type=BatchOperationType.PUT,
row_key="unknown",
execution_time=0,
error_message=str(e)
)
results.append(error_result)
return results
def _execute_sequential_operations(self, table_name: str, operations: List[BatchOperation]) -> List[BatchResult]:
"""顺序执行操作"""
return [self._simulate_operation(table_name, op) for op in operations]
def _simulate_operation(self, table_name: str, operation: BatchOperation) -> BatchResult:
"""模拟操作执行"""
start_time = time.time()
# 模拟不同操作的执行时间
if operation.operation_type == BatchOperationType.PUT:
time.sleep(0.005)
elif operation.operation_type == BatchOperationType.DELETE:
time.sleep(0.003)
elif operation.operation_type == BatchOperationType.INCREMENT:
time.sleep(0.008)
else:
time.sleep(0.010)
execution_time = time.time() - start_time
# 模拟成功率(95%)
import random
success = random.random() > 0.05
return BatchResult(
operation_id=operation.operation_id,
success=success,
operation_type=operation.operation_type,
row_key=operation.row_key,
execution_time=execution_time,
error_message=None if success else "模拟错误"
)
def _optimize_batch_size(self):
"""优化批次大小"""
if len(self.performance_history) < 3:
return
# 获取最近的性能数据
recent_data = self.performance_history[-3:]
# 计算平均吞吐量
avg_throughput = sum(d["throughput"] for d in recent_data) / len(recent_data)
avg_success_rate = sum(d["success_rate"] for d in recent_data) / len(recent_data)
# 根据性能调整批次大小
if avg_success_rate < 0.9: # 成功率低,减小批次
new_batch_size = max(self.min_batch_size, int(self.batch_size * 0.8))
elif avg_throughput > 1000: # 吞吐量高,增加批次
new_batch_size = min(self.max_batch_size, int(self.batch_size * 1.2))
else:
new_batch_size = self.batch_size
if new_batch_size != self.batch_size:
print(f"批次大小优化: {self.batch_size} -> {new_batch_size}")
self.batch_size = new_batch_size
def get_performance_report(self) -> Dict[str, Any]:
"""获取性能报告"""
if not self.performance_history:
return {"message": "暂无性能数据"}
recent_data = self.performance_history[-10:] # 最近10次
return {
"current_batch_size": self.batch_size,
"total_batches": len(self.performance_history),
"avg_throughput": sum(d["throughput"] for d in recent_data) / len(recent_data),
"avg_success_rate": sum(d["success_rate"] for d in recent_data) / len(recent_data),
"avg_execution_time": sum(d["execution_time"] for d in recent_data) / len(recent_data),
"optimization_enabled": self.auto_optimize
}
# 智能批量操作示例
print("\n=== 智能批量操作示例 ===")
smart_client = SmartBatchClient(None, initial_batch_size=50)
# 添加大量操作
for i in range(500):
operation = BatchOperation(
operation_type=BatchOperationType.PUT,
row_key=f"smart_user_{i:04d}",
column_family="info",
qualifier="name",
value=f"智能用户{i}",
operation_id=f"smart_put_{i}"
)
smart_client.add_operation(operation)
# 手动刷新剩余操作
smart_client.flush("users")
# 查看性能报告
report = smart_client.get_performance_report()
print(f"性能报告: {report}")
2. 高级过滤器
2.1 复合过滤器
from abc import ABC, abstractmethod
import re
from typing import Callable
class AdvancedFilterType(Enum):
"""高级过滤器类型枚举"""
SINGLE_COLUMN_VALUE = "single_column_value"
MULTIPLE_COLUMN_VALUE = "multiple_column_value"
ROW_KEY_REGEX = "row_key_regex"
COLUMN_PREFIX = "column_prefix"
TIMESTAMP_RANGE = "timestamp_range"
CUSTOM_FUNCTION = "custom_function"
FUZZY_ROW = "fuzzy_row"
PAGE_FILTER = "page_filter"
class ComparisonOperator(Enum):
"""比较操作符枚举"""
EQUAL = "="
NOT_EQUAL = "!="
GREATER = ">"
GREATER_OR_EQUAL = ">="
LESS = "<"
LESS_OR_EQUAL = "<="
REGEX_MATCH = "~"
SUBSTRING = "contains"
STARTS_WITH = "starts_with"
ENDS_WITH = "ends_with"
class LogicalOperator(Enum):
"""逻辑操作符枚举"""
AND = "AND"
OR = "OR"
NOT = "NOT"
class AdvancedFilter(ABC):
"""高级过滤器基类"""
def __init__(self, filter_type: AdvancedFilterType):
self.filter_type = filter_type
self.enabled = True
@abstractmethod
def apply(self, row_key: str, row_data: Dict[str, Any]) -> bool:
"""应用过滤器"""
pass
@abstractmethod
def get_description(self) -> str:
"""获取过滤器描述"""
pass
def enable(self) -> 'AdvancedFilter':
"""启用过滤器"""
self.enabled = True
return self
def disable(self) -> 'AdvancedFilter':
"""禁用过滤器"""
self.enabled = False
return self
class SingleColumnValueFilter(AdvancedFilter):
"""单列值过滤器"""
def __init__(self, column_family: str, qualifier: str,
operator: ComparisonOperator, value: Any,
filter_if_missing: bool = False):
super().__init__(AdvancedFilterType.SINGLE_COLUMN_VALUE)
self.column_family = column_family
self.qualifier = qualifier
self.operator = operator
self.value = value
self.filter_if_missing = filter_if_missing
self.column_name = f"{column_family}:{qualifier}"
def apply(self, row_key: str, row_data: Dict[str, Any]) -> bool:
"""应用过滤器"""
if not self.enabled:
return True
if self.column_name not in row_data:
return not self.filter_if_missing
cell_value = row_data[self.column_name]["value"]
return self._compare_values(cell_value, self.value, self.operator)
def _compare_values(self, cell_value: Any, filter_value: Any, operator: ComparisonOperator) -> bool:
"""比较值"""
try:
if operator == ComparisonOperator.EQUAL:
return str(cell_value) == str(filter_value)
elif operator == ComparisonOperator.NOT_EQUAL:
return str(cell_value) != str(filter_value)
elif operator == ComparisonOperator.GREATER:
return float(cell_value) > float(filter_value)
elif operator == ComparisonOperator.GREATER_OR_EQUAL:
return float(cell_value) >= float(filter_value)
elif operator == ComparisonOperator.LESS:
return float(cell_value) < float(filter_value)
elif operator == ComparisonOperator.LESS_OR_EQUAL:
return float(cell_value) <= float(filter_value)
elif operator == ComparisonOperator.REGEX_MATCH:
return bool(re.match(str(filter_value), str(cell_value)))
elif operator == ComparisonOperator.SUBSTRING:
return str(filter_value) in str(cell_value)
elif operator == ComparisonOperator.STARTS_WITH:
return str(cell_value).startswith(str(filter_value))
elif operator == ComparisonOperator.ENDS_WITH:
return str(cell_value).endswith(str(filter_value))
except (ValueError, TypeError):
return False
return False
def get_description(self) -> str:
return f"SingleColumnValue({self.column_name} {self.operator.value} {self.value})"
class MultipleColumnValueFilter(AdvancedFilter):
"""多列值过滤器"""
def __init__(self, conditions: List[Dict[str, Any]], logical_operator: LogicalOperator = LogicalOperator.AND):
super().__init__(AdvancedFilterType.MULTIPLE_COLUMN_VALUE)
self.conditions = conditions
self.logical_operator = logical_operator
def apply(self, row_key: str, row_data: Dict[str, Any]) -> bool:
"""应用过滤器"""
if not self.enabled:
return True
results = []
for condition in self.conditions:
column_name = f"{condition['column_family']}:{condition['qualifier']}"
if column_name not in row_data:
results.append(not condition.get('filter_if_missing', False))
continue
cell_value = row_data[column_name]["value"]
operator = ComparisonOperator(condition['operator'])
filter_value = condition['value']
result = self._compare_values(cell_value, filter_value, operator)
results.append(result)
if self.logical_operator == LogicalOperator.AND:
return all(results)
elif self.logical_operator == LogicalOperator.OR:
return any(results)
else: # NOT
return not all(results)
def _compare_values(self, cell_value: Any, filter_value: Any, operator: ComparisonOperator) -> bool:
"""比较值(复用SingleColumnValueFilter的逻辑)"""
temp_filter = SingleColumnValueFilter("", "", operator, filter_value)
return temp_filter._compare_values(cell_value, filter_value, operator)
def get_description(self) -> str:
condition_strs = []
for cond in self.conditions:
condition_strs.append(f"{cond['column_family']}:{cond['qualifier']} {cond['operator']} {cond['value']}")
return f"MultipleColumnValue({f' {self.logical_operator.value} '.join(condition_strs)})"
class RowKeyRegexFilter(AdvancedFilter):
"""行键正则表达式过滤器"""
def __init__(self, pattern: str, flags: int = 0):
super().__init__(AdvancedFilterType.ROW_KEY_REGEX)
self.pattern = pattern
self.flags = flags
self.compiled_pattern = re.compile(pattern, flags)
def apply(self, row_key: str, row_data: Dict[str, Any]) -> bool:
"""应用过滤器"""
if not self.enabled:
return True
return bool(self.compiled_pattern.match(row_key))
def get_description(self) -> str:
return f"RowKeyRegex({self.pattern})"
class ColumnPrefixFilter(AdvancedFilter):
"""列前缀过滤器"""
def __init__(self, prefix: str):
super().__init__(AdvancedFilterType.COLUMN_PREFIX)
self.prefix = prefix
def apply(self, row_key: str, row_data: Dict[str, Any]) -> bool:
"""应用过滤器"""
if not self.enabled:
return True
# 检查是否有列以指定前缀开头
for column_name in row_data.keys():
if column_name.startswith(self.prefix):
return True
return False
def get_description(self) -> str:
return f"ColumnPrefix({self.prefix})"
class TimestampRangeFilter(AdvancedFilter):
"""时间戳范围过滤器"""
def __init__(self, min_timestamp: Optional[int] = None, max_timestamp: Optional[int] = None):
super().__init__(AdvancedFilterType.TIMESTAMP_RANGE)
self.min_timestamp = min_timestamp
self.max_timestamp = max_timestamp
def apply(self, row_key: str, row_data: Dict[str, Any]) -> bool:
"""应用过滤器"""
if not self.enabled:
return True
# 检查所有列的时间戳
for column_name, cell_data in row_data.items():
timestamp = cell_data.get("timestamp", int(time.time() * 1000))
if self.min_timestamp and timestamp < self.min_timestamp:
continue
if self.max_timestamp and timestamp > self.max_timestamp:
continue
return True
return False
def get_description(self) -> str:
return f"TimestampRange({self.min_timestamp} - {self.max_timestamp})"
class CustomFunctionFilter(AdvancedFilter):
"""自定义函数过滤器"""
def __init__(self, filter_function: Callable[[str, Dict[str, Any]], bool], description: str = "Custom"):
super().__init__(AdvancedFilterType.CUSTOM_FUNCTION)
self.filter_function = filter_function
self.description = description
def apply(self, row_key: str, row_data: Dict[str, Any]) -> bool:
"""应用过滤器"""
if not self.enabled:
return True
try:
return self.filter_function(row_key, row_data)
except Exception:
return False
def get_description(self) -> str:
return f"CustomFunction({self.description})"
class PageFilter(AdvancedFilter):
"""分页过滤器"""
def __init__(self, page_size: int):
super().__init__(AdvancedFilterType.PAGE_FILTER)
self.page_size = page_size
self.current_count = 0
def apply(self, row_key: str, row_data: Dict[str, Any]) -> bool:
"""应用过滤器"""
if not self.enabled:
return True
if self.current_count < self.page_size:
self.current_count += 1
return True
return False
def reset(self):
"""重置计数器"""
self.current_count = 0
def get_description(self) -> str:
return f"Page({self.page_size})"
class FilterChain:
"""过滤器链"""
def __init__(self, logical_operator: LogicalOperator = LogicalOperator.AND):
self.filters: List[AdvancedFilter] = []
self.logical_operator = logical_operator
def add_filter(self, filter_obj: AdvancedFilter) -> 'FilterChain':
"""添加过滤器"""
self.filters.append(filter_obj)
return self
def remove_filter(self, filter_obj: AdvancedFilter) -> 'FilterChain':
"""移除过滤器"""
if filter_obj in self.filters:
self.filters.remove(filter_obj)
return self
def apply(self, row_key: str, row_data: Dict[str, Any]) -> bool:
"""应用过滤器链"""
if not self.filters:
return True
results = [f.apply(row_key, row_data) for f in self.filters if f.enabled]
if not results:
return True
if self.logical_operator == LogicalOperator.AND:
return all(results)
elif self.logical_operator == LogicalOperator.OR:
return any(results)
else: # NOT
return not all(results)
def get_description(self) -> str:
"""获取过滤器链描述"""
if not self.filters:
return "EmptyFilterChain"
descriptions = [f.get_description() for f in self.filters if f.enabled]
return f"FilterChain({f' {self.logical_operator.value} '.join(descriptions)})"
def get_statistics(self) -> Dict[str, Any]:
"""获取过滤器统计信息"""
return {
"total_filters": len(self.filters),
"enabled_filters": sum(1 for f in self.filters if f.enabled),
"disabled_filters": sum(1 for f in self.filters if not f.enabled),
"logical_operator": self.logical_operator.value,
"filter_types": [f.filter_type.value for f in self.filters]
}
# 高级过滤器使用示例
print("\n=== 高级过滤器示例 ===")
# 创建测试数据
test_data = {
"user001": {
"info:name": {"value": "张三", "timestamp": 1640995200000},
"info:age": {"value": "25", "timestamp": 1640995200000},
"stats:login_count": {"value": "100", "timestamp": 1640995200000}
},
"user002": {
"info:name": {"value": "李四", "timestamp": 1640995300000},
"info:age": {"value": "30", "timestamp": 1640995300000},
"stats:login_count": {"value": "50", "timestamp": 1640995300000}
},
"admin001": {
"info:name": {"value": "管理员", "timestamp": 1640995400000},
"info:age": {"value": "35", "timestamp": 1640995400000},
"stats:login_count": {"value": "200", "timestamp": 1640995400000}
}
}
# 1. 单列值过滤器
age_filter = SingleColumnValueFilter("info", "age", ComparisonOperator.GREATER, "25")
print(f"年龄大于25的用户:")
for row_key, row_data in test_data.items():
if age_filter.apply(row_key, row_data):
name = row_data["info:name"]["value"]
age = row_data["info:age"]["value"]
print(f" {row_key}: {name}, 年龄: {age}")
# 2. 多列值过滤器
conditions = [
{"column_family": "info", "qualifier": "age", "operator": ">", "value": "20"},
{"column_family": "stats", "qualifier": "login_count", "operator": ">", "value": "80"}
]
multi_filter = MultipleColumnValueFilter(conditions, LogicalOperator.AND)
print(f"\n年龄大于20且登录次数大于80的用户:")
for row_key, row_data in test_data.items():
if multi_filter.apply(row_key, row_data):
name = row_data["info:name"]["value"]
age = row_data["info:age"]["value"]
login_count = row_data["stats:login_count"]["value"]
print(f" {row_key}: {name}, 年龄: {age}, 登录次数: {login_count}")
# 3. 行键正则表达式过滤器
row_key_filter = RowKeyRegexFilter(r"user\d+")
print(f"\n行键匹配'user\\d+'的记录:")
for row_key, row_data in test_data.items():
if row_key_filter.apply(row_key, row_data):
name = row_data["info:name"]["value"]
print(f" {row_key}: {name}")
# 4. 自定义函数过滤器
def custom_filter_func(row_key: str, row_data: Dict[str, Any]) -> bool:
"""自定义过滤函数:筛选名字包含'三'或'四'的用户"""
name = row_data.get("info:name", {}).get("value", "")
return "三" in name or "四" in name
custom_filter = CustomFunctionFilter(custom_filter_func, "名字包含'三'或'四'")
print(f"\n{custom_filter.get_description()}的用户:")
for row_key, row_data in test_data.items():
if custom_filter.apply(row_key, row_data):
name = row_data["info:name"]["value"]
print(f" {row_key}: {name}")
# 5. 过滤器链
filter_chain = FilterChain(LogicalOperator.AND)
filter_chain.add_filter(RowKeyRegexFilter(r"user\d+")) \
.add_filter(SingleColumnValueFilter("info", "age", ComparisonOperator.GREATER_OR_EQUAL, "25"))
print(f"\n过滤器链结果 ({filter_chain.get_description()}):")
for row_key, row_data in test_data.items():
if filter_chain.apply(row_key, row_data):
name = row_data["info:name"]["value"]
age = row_data["info:age"]["value"]
print(f" {row_key}: {name}, 年龄: {age}")
# 查看过滤器链统计信息
stats = filter_chain.get_statistics()
print(f"\n过滤器链统计: {stats}")
3. 计数器操作
3.1 原子计数器
class CounterOperationType(Enum):
"""计数器操作类型枚举"""
INCREMENT = "increment"
DECREMENT = "decrement"
SET = "set"
GET = "get"
RESET = "reset"
COMPARE_AND_SET = "compare_and_set"
class CounterDataType(Enum):
"""计数器数据类型枚举"""
LONG = "long"
DOUBLE = "double"
BIG_INTEGER = "big_integer"
@dataclass
class CounterOperation:
"""计数器操作数据类"""
operation_type: CounterOperationType
table_name: str
row_key: str
column_family: str
qualifier: str
value: Union[int, float] = 0
expected_value: Optional[Union[int, float]] = None
data_type: CounterDataType = CounterDataType.LONG
operation_id: Optional[str] = None
timestamp: Optional[int] = None
@dataclass
class CounterResult:
"""计数器操作结果数据类"""
operation_id: str
success: bool
operation_type: CounterOperationType
row_key: str
column_name: str
old_value: Union[int, float]
new_value: Union[int, float]
execution_time: float
error_message: Optional[str] = None
class HBaseCounter:
"""HBase计数器操作类"""
def __init__(self, connection, table_name: str):
self.connection = connection
self.table_name = table_name
self.counters: Dict[str, Union[int, float]] = {} # 模拟计数器存储
self.operation_history: List[CounterResult] = []
self._lock = threading.Lock()
def increment(self, row_key: str, column_family: str, qualifier: str,
increment_value: Union[int, float] = 1,
data_type: CounterDataType = CounterDataType.LONG) -> CounterResult:
"""增加计数器值"""
return self._execute_counter_operation(
CounterOperation(
operation_type=CounterOperationType.INCREMENT,
table_name=self.table_name,
row_key=row_key,
column_family=column_family,
qualifier=qualifier,
value=increment_value,
data_type=data_type,
operation_id=f"inc_{len(self.operation_history)}"
)
)
def decrement(self, row_key: str, column_family: str, qualifier: str,
decrement_value: Union[int, float] = 1,
data_type: CounterDataType = CounterDataType.LONG) -> CounterResult:
"""减少计数器值"""
return self._execute_counter_operation(
CounterOperation(
operation_type=CounterOperationType.DECREMENT,
table_name=self.table_name,
row_key=row_key,
column_family=column_family,
qualifier=qualifier,
value=decrement_value,
data_type=data_type,
operation_id=f"dec_{len(self.operation_history)}"
)
)
def set_counter(self, row_key: str, column_family: str, qualifier: str,
value: Union[int, float],
data_type: CounterDataType = CounterDataType.LONG) -> CounterResult:
"""设置计数器值"""
return self._execute_counter_operation(
CounterOperation(
operation_type=CounterOperationType.SET,
table_name=self.table_name,
row_key=row_key,
column_family=column_family,
qualifier=qualifier,
value=value,
data_type=data_type,
operation_id=f"set_{len(self.operation_history)}"
)
)
def get_counter(self, row_key: str, column_family: str, qualifier: str) -> CounterResult:
"""获取计数器值"""
return self._execute_counter_operation(
CounterOperation(
operation_type=CounterOperationType.GET,
table_name=self.table_name,
row_key=row_key,
column_family=column_family,
qualifier=qualifier,
operation_id=f"get_{len(self.operation_history)}"
)
)
def reset_counter(self, row_key: str, column_family: str, qualifier: str) -> CounterResult:
"""重置计数器值"""
return self._execute_counter_operation(
CounterOperation(
operation_type=CounterOperationType.RESET,
table_name=self.table_name,
row_key=row_key,
column_family=column_family,
qualifier=qualifier,
value=0,
operation_id=f"reset_{len(self.operation_history)}"
)
)
def compare_and_set(self, row_key: str, column_family: str, qualifier: str,
expected_value: Union[int, float], new_value: Union[int, float],
data_type: CounterDataType = CounterDataType.LONG) -> CounterResult:
"""比较并设置计数器值"""
return self._execute_counter_operation(
CounterOperation(
operation_type=CounterOperationType.COMPARE_AND_SET,
table_name=self.table_name,
row_key=row_key,
column_family=column_family,
qualifier=qualifier,
value=new_value,
expected_value=expected_value,
data_type=data_type,
operation_id=f"cas_{len(self.operation_history)}"
)
)
def _execute_counter_operation(self, operation: CounterOperation) -> CounterResult:
"""执行计数器操作"""
start_time = time.time()
counter_key = f"{operation.row_key}:{operation.column_family}:{operation.qualifier}"
with self._lock:
try:
old_value = self.counters.get(counter_key, 0)
new_value = old_value
success = True
error_message = None
if operation.operation_type == CounterOperationType.INCREMENT:
new_value = old_value + operation.value
elif operation.operation_type == CounterOperationType.DECREMENT:
new_value = old_value - operation.value
elif operation.operation_type == CounterOperationType.SET:
new_value = operation.value
elif operation.operation_type == CounterOperationType.GET:
new_value = old_value
elif operation.operation_type == CounterOperationType.RESET:
new_value = 0
elif operation.operation_type == CounterOperationType.COMPARE_AND_SET:
if old_value == operation.expected_value:
new_value = operation.value
else:
success = False
error_message = f"期望值 {operation.expected_value} 不匹配当前值 {old_value}"
new_value = old_value
# 数据类型转换
if operation.data_type == CounterDataType.LONG:
new_value = int(new_value)
elif operation.data_type == CounterDataType.DOUBLE:
new_value = float(new_value)
# 更新计数器值(除了GET操作)
if success and operation.operation_type != CounterOperationType.GET:
self.counters[counter_key] = new_value
execution_time = time.time() - start_time
result = CounterResult(
operation_id=operation.operation_id,
success=success,
operation_type=operation.operation_type,
row_key=operation.row_key,
column_name=f"{operation.column_family}:{operation.qualifier}",
old_value=old_value,
new_value=new_value,
execution_time=execution_time,
error_message=error_message
)
self.operation_history.append(result)
return result
except Exception as e:
execution_time = time.time() - start_time
error_result = CounterResult(
operation_id=operation.operation_id,
success=False,
operation_type=operation.operation_type,
row_key=operation.row_key,
column_name=f"{operation.column_family}:{operation.qualifier}",
old_value=0,
new_value=0,
execution_time=execution_time,
error_message=str(e)
)
self.operation_history.append(error_result)
return error_result
def batch_increment(self, operations: List[Dict[str, Any]]) -> List[CounterResult]:
"""批量增加计数器"""
results = []
for op_data in operations:
result = self.increment(
row_key=op_data['row_key'],
column_family=op_data['column_family'],
qualifier=op_data['qualifier'],
increment_value=op_data.get('value', 1),
data_type=CounterDataType(op_data.get('data_type', 'long'))
)
results.append(result)
return results
def get_all_counters(self) -> Dict[str, Union[int, float]]:
"""获取所有计数器值"""
with self._lock:
return self.counters.copy()
def get_counter_statistics(self) -> Dict[str, Any]:
"""获取计数器统计信息"""
successful_ops = [r for r in self.operation_history if r.success]
failed_ops = [r for r in self.operation_history if not r.success]
return {
"total_operations": len(self.operation_history),
"successful_operations": len(successful_ops),
"failed_operations": len(failed_ops),
"success_rate": round(len(successful_ops) / max(len(self.operation_history), 1) * 100, 2),
"total_counters": len(self.counters),
"avg_execution_time": round(sum(r.execution_time for r in self.operation_history) / max(len(self.operation_history), 1), 4),
"operation_types": {op_type.value: sum(1 for r in self.operation_history if r.operation_type == op_type)
for op_type in CounterOperationType}
}
def clear_history(self):
"""清空操作历史"""
self.operation_history.clear()
def clear_counters(self):
"""清空所有计数器"""
with self._lock:
self.counters.clear()
# 计数器使用示例
print("\n=== HBase计数器操作示例 ===")
# 创建计数器客户端
counter_client = HBaseCounter(None, "user_stats")
# 基本计数器操作
print("1. 基本计数器操作:")
# 增加登录次数
result1 = counter_client.increment("user001", "stats", "login_count", 1)
print(f" 增加登录次数: {result1.old_value} -> {result1.new_value}")
# 再次增加
result2 = counter_client.increment("user001", "stats", "login_count", 5)
print(f" 再次增加: {result2.old_value} -> {result2.new_value}")
# 设置页面浏览次数
result3 = counter_client.set_counter("user001", "stats", "page_views", 100)
print(f" 设置页面浏览次数: {result3.old_value} -> {result3.new_value}")
# 获取当前值
result4 = counter_client.get_counter("user001", "stats", "login_count")
print(f" 当前登录次数: {result4.new_value}")
# 比较并设置
result5 = counter_client.compare_and_set("user001", "stats", "login_count", 6, 10)
print(f" 比较并设置 (期望6->10): 成功={result5.success}, {result5.old_value} -> {result5.new_value}")
# 批量操作
print("\n2. 批量计数器操作:")
batch_operations = [
{"row_key": "user002", "column_family": "stats", "qualifier": "login_count", "value": 3},
{"row_key": "user002", "column_family": "stats", "qualifier": "page_views", "value": 50},
{"row_key": "user003", "column_family": "stats", "qualifier": "login_count", "value": 1}
]
batch_results = counter_client.batch_increment(batch_operations)
for result in batch_results:
print(f" {result.row_key} {result.column_name}: {result.old_value} -> {result.new_value}")
# 查看所有计数器
print("\n3. 所有计数器值:")
all_counters = counter_client.get_all_counters()
for counter_key, value in all_counters.items():
print(f" {counter_key}: {value}")
# 查看统计信息
stats = counter_client.get_counter_statistics()
print(f"\n4. 计数器统计信息: {stats}")
3.2 分布式计数器
class DistributedCounter:
"""分布式计数器"""
def __init__(self, connection, table_name: str, counter_name: str, shard_count: int = 10):
self.connection = connection
self.table_name = table_name
self.counter_name = counter_name
self.shard_count = shard_count
self.counters = {} # 模拟分片计数器
self._lock = threading.Lock()
def increment(self, increment_value: int = 1) -> int:
"""增加分布式计数器值"""
import random
# 随机选择一个分片
shard_id = random.randint(0, self.shard_count - 1)
shard_key = f"{self.counter_name}_shard_{shard_id}"
with self._lock:
current_value = self.counters.get(shard_key, 0)
new_value = current_value + increment_value
self.counters[shard_key] = new_value
return self.get_total_count()
def get_total_count(self) -> int:
"""获取总计数"""
with self._lock:
total = sum(self.counters.values())
return total
def get_shard_counts(self) -> Dict[str, int]:
"""获取各分片计数"""
with self._lock:
return self.counters.copy()
def reset(self):
"""重置计数器"""
with self._lock:
self.counters.clear()
# 分布式计数器示例
print("\n=== 分布式计数器示例 ===")
dist_counter = DistributedCounter(None, "global_stats", "total_visits", shard_count=5)
# 模拟并发增加
for i in range(20):
total = dist_counter.increment(1)
if i % 5 == 0:
print(f"第{i+1}次增加后,总计数: {total}")
# 查看分片分布
shard_counts = dist_counter.get_shard_counts()
print(f"\n分片计数分布:")
for shard_key, count in shard_counts.items():
print(f" {shard_key}: {count}")
print(f"总计数: {dist_counter.get_total_count()}")
4. 协处理器
4.1 协处理器基础
class CoprocessorType(Enum):
"""协处理器类型枚举"""
OBSERVER = "observer" # 观察者协处理器
ENDPOINT = "endpoint" # 端点协处理器
REGION_OBSERVER = "region_observer" # Region观察者
MASTER_OBSERVER = "master_observer" # Master观察者
WAL_OBSERVER = "wal_observer" # WAL观察者
class CoprocessorEvent(Enum):
"""协处理器事件枚举"""
PRE_PUT = "pre_put"
POST_PUT = "post_put"
PRE_DELETE = "pre_delete"
POST_DELETE = "post_delete"
PRE_GET = "pre_get"
POST_GET = "post_get"
PRE_SCAN = "pre_scan"
POST_SCAN = "post_scan"
REGION_OPEN = "region_open"
REGION_CLOSE = "region_close"
@dataclass
class CoprocessorContext:
"""协处理器上下文"""
table_name: str
region_name: str
row_key: str
column_family: str
qualifier: str
value: Any
timestamp: int
operation_type: str
user_info: Dict[str, Any]
class HBaseCoprocessor(ABC):
"""HBase协处理器基类"""
def __init__(self, coprocessor_type: CoprocessorType, name: str):
self.coprocessor_type = coprocessor_type
self.name = name
self.enabled = True
self.execution_count = 0
self.error_count = 0
self.total_execution_time = 0
@abstractmethod
def process(self, event: CoprocessorEvent, context: CoprocessorContext) -> bool:
"""处理协处理器事件"""
pass
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
"name": self.name,
"type": self.coprocessor_type.value,
"enabled": self.enabled,
"execution_count": self.execution_count,
"error_count": self.error_count,
"success_rate": round((self.execution_count - self.error_count) / max(self.execution_count, 1) * 100, 2),
"avg_execution_time": round(self.total_execution_time / max(self.execution_count, 1), 4)
}
def enable(self):
"""启用协处理器"""
self.enabled = True
def disable(self):
"""禁用协处理器"""
self.enabled = False
class AuditCoprocessor(HBaseCoprocessor):
"""审计协处理器"""
def __init__(self):
super().__init__(CoprocessorType.OBSERVER, "AuditCoprocessor")
self.audit_logs: List[Dict[str, Any]] = []
def process(self, event: CoprocessorEvent, context: CoprocessorContext) -> bool:
"""处理审计事件"""
if not self.enabled:
return True
start_time = time.time()
try:
# 记录审计日志
audit_entry = {
"timestamp": int(time.time() * 1000),
"event": event.value,
"table_name": context.table_name,
"row_key": context.row_key,
"column": f"{context.column_family}:{context.qualifier}",
"operation_type": context.operation_type,
"user": context.user_info.get("username", "unknown"),
"region": context.region_name
}
self.audit_logs.append(audit_entry)
# 限制日志数量
if len(self.audit_logs) > 1000:
self.audit_logs = self.audit_logs[-500:]
self.execution_count += 1
self.total_execution_time += time.time() - start_time
return True
except Exception as e:
self.error_count += 1
print(f"审计协处理器错误: {e}")
return False
def get_audit_logs(self, limit: int = 100) -> List[Dict[str, Any]]:
"""获取审计日志"""
return self.audit_logs[-limit:]
def search_audit_logs(self, **filters) -> List[Dict[str, Any]]:
"""搜索审计日志"""
results = []
for log in self.audit_logs:
match = True
for key, value in filters.items():
if key in log and log[key] != value:
match = False
break
if match:
results.append(log)
return results
class SecurityCoprocessor(HBaseCoprocessor):
"""安全协处理器"""
def __init__(self):
super().__init__(CoprocessorType.OBSERVER, "SecurityCoprocessor")
self.access_rules: Dict[str, Dict[str, Any]] = {}
self.blocked_operations: List[Dict[str, Any]] = []
def add_access_rule(self, table_name: str, column_family: str,
allowed_users: List[str], allowed_operations: List[str]):
"""添加访问规则"""
rule_key = f"{table_name}:{column_family}"
self.access_rules[rule_key] = {
"allowed_users": allowed_users,
"allowed_operations": allowed_operations
}
def process(self, event: CoprocessorEvent, context: CoprocessorContext) -> bool:
"""处理安全检查"""
if not self.enabled:
return True
start_time = time.time()
try:
rule_key = f"{context.table_name}:{context.column_family}"
if rule_key in self.access_rules:
rule = self.access_rules[rule_key]
username = context.user_info.get("username", "unknown")
operation = context.operation_type
# 检查用户权限
if username not in rule["allowed_users"]:
self._block_operation("用户无权限", context)
return False
# 检查操作权限
if operation not in rule["allowed_operations"]:
self._block_operation("操作不被允许", context)
return False
self.execution_count += 1
self.total_execution_time += time.time() - start_time
return True
except Exception as e:
self.error_count += 1
print(f"安全协处理器错误: {e}")
return False
def _block_operation(self, reason: str, context: CoprocessorContext):
"""阻止操作"""
blocked_entry = {
"timestamp": int(time.time() * 1000),
"reason": reason,
"table_name": context.table_name,
"row_key": context.row_key,
"column": f"{context.column_family}:{context.qualifier}",
"operation_type": context.operation_type,
"user": context.user_info.get("username", "unknown")
}
self.blocked_operations.append(blocked_entry)
def get_blocked_operations(self, limit: int = 100) -> List[Dict[str, Any]]:
"""获取被阻止的操作"""
return self.blocked_operations[-limit:]
class DataValidationCoprocessor(HBaseCoprocessor):
"""数据验证协处理器"""
def __init__(self):
super().__init__(CoprocessorType.OBSERVER, "DataValidationCoprocessor")
self.validation_rules: Dict[str, Callable] = {}
self.validation_errors: List[Dict[str, Any]] = []
def add_validation_rule(self, column_pattern: str, validation_func: Callable[[Any], bool]):
"""添加验证规则"""
self.validation_rules[column_pattern] = validation_func
def process(self, event: CoprocessorEvent, context: CoprocessorContext) -> bool:
"""处理数据验证"""
if not self.enabled or event not in [CoprocessorEvent.PRE_PUT]:
return True
start_time = time.time()
try:
column_name = f"{context.column_family}:{context.qualifier}"
# 应用验证规则
for pattern, validation_func in self.validation_rules.items():
if pattern in column_name or pattern == "*":
if not validation_func(context.value):
self._record_validation_error(f"验证规则 '{pattern}' 失败", context)
return False
self.execution_count += 1
self.total_execution_time += time.time() - start_time
return True
except Exception as e:
self.error_count += 1
print(f"数据验证协处理器错误: {e}")
return False
def _record_validation_error(self, reason: str, context: CoprocessorContext):
"""记录验证错误"""
error_entry = {
"timestamp": int(time.time() * 1000),
"reason": reason,
"table_name": context.table_name,
"row_key": context.row_key,
"column": f"{context.column_family}:{context.qualifier}",
"value": str(context.value),
"user": context.user_info.get("username", "unknown")
}
self.validation_errors.append(error_entry)
def get_validation_errors(self, limit: int = 100) -> List[Dict[str, Any]]:
"""获取验证错误"""
return self.validation_errors[-limit:]
class CoprocessorManager:
"""协处理器管理器"""
def __init__(self):
self.coprocessors: List[HBaseCoprocessor] = []
self.event_handlers: Dict[CoprocessorEvent, List[HBaseCoprocessor]] = {}
def register_coprocessor(self, coprocessor: HBaseCoprocessor, events: List[CoprocessorEvent]):
"""注册协处理器"""
self.coprocessors.append(coprocessor)
for event in events:
if event not in self.event_handlers:
self.event_handlers[event] = []
self.event_handlers[event].append(coprocessor)
def unregister_coprocessor(self, coprocessor: HBaseCoprocessor):
"""注销协处理器"""
if coprocessor in self.coprocessors:
self.coprocessors.remove(coprocessor)
for event_list in self.event_handlers.values():
if coprocessor in event_list:
event_list.remove(coprocessor)
def trigger_event(self, event: CoprocessorEvent, context: CoprocessorContext) -> bool:
"""触发事件"""
if event not in self.event_handlers:
return True
for coprocessor in self.event_handlers[event]:
if coprocessor.enabled:
if not coprocessor.process(event, context):
return False # 如果任何协处理器返回False,停止处理
return True
def get_coprocessor_statistics(self) -> List[Dict[str, Any]]:
"""获取所有协处理器统计信息"""
return [cp.get_statistics() for cp in self.coprocessors]
def enable_all(self):
"""启用所有协处理器"""
for coprocessor in self.coprocessors:
coprocessor.enable()
def disable_all(self):
"""禁用所有协处理器"""
for coprocessor in self.coprocessors:
coprocessor.disable()
# 协处理器使用示例
print("\n=== HBase协处理器示例 ===")
# 创建协处理器管理器
cp_manager = CoprocessorManager()
# 创建协处理器实例
audit_cp = AuditCoprocessor()
security_cp = SecurityCoprocessor()
validation_cp = DataValidationCoprocessor()
# 注册协处理器
cp_manager.register_coprocessor(audit_cp, [CoprocessorEvent.PRE_PUT, CoprocessorEvent.POST_PUT, CoprocessorEvent.PRE_DELETE])
cp_manager.register_coprocessor(security_cp, [CoprocessorEvent.PRE_PUT, CoprocessorEvent.PRE_DELETE, CoprocessorEvent.PRE_GET])
cp_manager.register_coprocessor(validation_cp, [CoprocessorEvent.PRE_PUT])
# 配置安全规则
security_cp.add_access_rule("users", "info", ["admin", "user1"], ["put", "get"])
security_cp.add_access_rule("users", "sensitive", ["admin"], ["put", "get", "delete"])
# 配置验证规则
def validate_age(value):
try:
age = int(value)
return 0 <= age <= 150
except:
return False
def validate_email(value):
import re
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, str(value)))
validation_cp.add_validation_rule("age", validate_age)
validation_cp.add_validation_rule("email", validate_email)
# 模拟操作
print("1. 模拟数据操作:")
# 创建测试上下文
contexts = [
CoprocessorContext(
table_name="users",
region_name="region1",
row_key="user001",
column_family="info",
qualifier="age",
value="25",
timestamp=int(time.time() * 1000),
operation_type="put",
user_info={"username": "admin"}
),
CoprocessorContext(
table_name="users",
region_name="region1",
row_key="user002",
column_family="info",
qualifier="email",
value="invalid-email",
timestamp=int(time.time() * 1000),
operation_type="put",
user_info={"username": "user1"}
),
CoprocessorContext(
table_name="users",
region_name="region1",
row_key="user003",
column_family="sensitive",
qualifier="ssn",
value="123-45-6789",
timestamp=int(time.time() * 1000),
operation_type="put",
user_info={"username": "user1"} # 无权限用户
)
]
# 触发事件
for i, context in enumerate(contexts):
print(f"\n 操作 {i+1}: {context.operation_type} {context.table_name}:{context.column_family}:{context.qualifier}")
success = cp_manager.trigger_event(CoprocessorEvent.PRE_PUT, context)
print(f" 结果: {'成功' if success else '失败'}")
if success:
cp_manager.trigger_event(CoprocessorEvent.POST_PUT, context)
# 查看协处理器统计
print("\n2. 协处理器统计信息:")
stats = cp_manager.get_coprocessor_statistics()
for stat in stats:
print(f" {stat['name']}: 执行{stat['execution_count']}次, 成功率{stat['success_rate']}%")
# 查看审计日志
print("\n3. 审计日志:")
audit_logs = audit_cp.get_audit_logs(5)
for log in audit_logs:
print(f" {log['timestamp']}: {log['event']} {log['table_name']} by {log['user']}")
# 查看被阻止的操作
print("\n4. 被阻止的操作:")
blocked_ops = security_cp.get_blocked_operations()
for op in blocked_ops:
print(f" {op['timestamp']}: {op['reason']} - {op['table_name']}:{op['column']} by {op['user']}")
# 查看验证错误
print("\n5. 验证错误:")
validation_errors = validation_cp.get_validation_errors()
for error in validation_errors:
print(f" {error['timestamp']}: {error['reason']} - {error['column']}={error['value']}")
5. 异步操作
5.1 异步客户端
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import Awaitable, Callable
class AsyncOperationType(Enum):
"""异步操作类型枚举"""
PUT = "put"
GET = "get"
DELETE = "delete"
SCAN = "scan"
BATCH = "batch"
INCREMENT = "increment"
class AsyncOperationStatus(Enum):
"""异步操作状态枚举"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class AsyncOperation:
"""异步操作数据类"""
operation_id: str
operation_type: AsyncOperationType
table_name: str
parameters: Dict[str, Any]
status: AsyncOperationStatus = AsyncOperationStatus.PENDING
result: Optional[Any] = None
error: Optional[str] = None
start_time: Optional[float] = None
end_time: Optional[float] = None
callback: Optional[Callable] = None
@dataclass
class AsyncResult:
"""异步操作结果数据类"""
operation_id: str
success: bool
result: Any
execution_time: float
error_message: Optional[str] = None
class HBaseAsyncClient:
"""HBase异步客户端"""
def __init__(self, connection, max_workers: int = 10):
self.connection = connection
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.operations: Dict[str, AsyncOperation] = {}
self.operation_counter = 0
self._lock = threading.Lock()
async def put_async(self, table_name: str, row_key: str,
column_family: str, qualifier: str, value: Any,
callback: Optional[Callable] = None) -> str:
"""异步PUT操作"""
operation_id = self._generate_operation_id()
operation = AsyncOperation(
operation_id=operation_id,
operation_type=AsyncOperationType.PUT,
table_name=table_name,
parameters={
"row_key": row_key,
"column_family": column_family,
"qualifier": qualifier,
"value": value
},
callback=callback
)
self.operations[operation_id] = operation
# 提交到线程池执行
loop = asyncio.get_event_loop()
future = loop.run_in_executor(
self.executor,
self._execute_put,
operation
)
# 异步等待结果
result = await future
return operation_id
async def get_async(self, table_name: str, row_key: str,
column_family: str, qualifier: str,
callback: Optional[Callable] = None) -> str:
"""异步GET操作"""
operation_id = self._generate_operation_id()
operation = AsyncOperation(
operation_id=operation_id,
operation_type=AsyncOperationType.GET,
table_name=table_name,
parameters={
"row_key": row_key,
"column_family": column_family,
"qualifier": qualifier
},
callback=callback
)
self.operations[operation_id] = operation
loop = asyncio.get_event_loop()
future = loop.run_in_executor(
self.executor,
self._execute_get,
operation
)
result = await future
return operation_id
async def batch_async(self, operations: List[Dict[str, Any]],
callback: Optional[Callable] = None) -> str:
"""异步批量操作"""
operation_id = self._generate_operation_id()
operation = AsyncOperation(
operation_id=operation_id,
operation_type=AsyncOperationType.BATCH,
table_name="multiple",
parameters={"operations": operations},
callback=callback
)
self.operations[operation_id] = operation
loop = asyncio.get_event_loop()
future = loop.run_in_executor(
self.executor,
self._execute_batch,
operation
)
result = await future
return operation_id
def _execute_put(self, operation: AsyncOperation) -> AsyncResult:
"""执行PUT操作"""
operation.status = AsyncOperationStatus.RUNNING
operation.start_time = time.time()
try:
# 模拟PUT操作
params = operation.parameters
time.sleep(0.1) # 模拟网络延迟
result = {
"row_key": params["row_key"],
"column": f"{params['column_family']}:{params['qualifier']}",
"value": params["value"],
"timestamp": int(time.time() * 1000)
}
operation.status = AsyncOperationStatus.COMPLETED
operation.result = result
operation.end_time = time.time()
async_result = AsyncResult(
operation_id=operation.operation_id,
success=True,
result=result,
execution_time=operation.end_time - operation.start_time
)
# 调用回调函数
if operation.callback:
operation.callback(async_result)
return async_result
except Exception as e:
operation.status = AsyncOperationStatus.FAILED
operation.error = str(e)
operation.end_time = time.time()
async_result = AsyncResult(
operation_id=operation.operation_id,
success=False,
result=None,
execution_time=operation.end_time - operation.start_time,
error_message=str(e)
)
if operation.callback:
operation.callback(async_result)
return async_result
def _execute_get(self, operation: AsyncOperation) -> AsyncResult:
"""执行GET操作"""
operation.status = AsyncOperationStatus.RUNNING
operation.start_time = time.time()
try:
# 模拟GET操作
params = operation.parameters
time.sleep(0.05) # 模拟网络延迟
result = {
"row_key": params["row_key"],
"column": f"{params['column_family']}:{params['qualifier']}",
"value": f"value_for_{params['row_key']}",
"timestamp": int(time.time() * 1000)
}
operation.status = AsyncOperationStatus.COMPLETED
operation.result = result
operation.end_time = time.time()
async_result = AsyncResult(
operation_id=operation.operation_id,
success=True,
result=result,
execution_time=operation.end_time - operation.start_time
)
if operation.callback:
operation.callback(async_result)
return async_result
except Exception as e:
operation.status = AsyncOperationStatus.FAILED
operation.error = str(e)
operation.end_time = time.time()
async_result = AsyncResult(
operation_id=operation.operation_id,
success=False,
result=None,
execution_time=operation.end_time - operation.start_time,
error_message=str(e)
)
if operation.callback:
operation.callback(async_result)
return async_result
def _execute_batch(self, operation: AsyncOperation) -> AsyncResult:
"""执行批量操作"""
operation.status = AsyncOperationStatus.RUNNING
operation.start_time = time.time()
try:
# 模拟批量操作
operations = operation.parameters["operations"]
results = []
for op in operations:
time.sleep(0.02) # 模拟每个操作的延迟
op_result = {
"operation_type": op["type"],
"row_key": op["row_key"],
"success": True
}
results.append(op_result)
operation.status = AsyncOperationStatus.COMPLETED
operation.result = results
operation.end_time = time.time()
async_result = AsyncResult(
operation_id=operation.operation_id,
success=True,
result=results,
execution_time=operation.end_time - operation.start_time
)
if operation.callback:
operation.callback(async_result)
return async_result
except Exception as e:
operation.status = AsyncOperationStatus.FAILED
operation.error = str(e)
operation.end_time = time.time()
async_result = AsyncResult(
operation_id=operation.operation_id,
success=False,
result=None,
execution_time=operation.end_time - operation.start_time,
error_message=str(e)
)
if operation.callback:
operation.callback(async_result)
return async_result
def _generate_operation_id(self) -> str:
"""生成操作ID"""
with self._lock:
self.operation_counter += 1
return f"async_op_{self.operation_counter}_{int(time.time() * 1000)}"
def get_operation_status(self, operation_id: str) -> Optional[AsyncOperation]:
"""获取操作状态"""
return self.operations.get(operation_id)
def cancel_operation(self, operation_id: str) -> bool:
"""取消操作"""
if operation_id in self.operations:
operation = self.operations[operation_id]
if operation.status == AsyncOperationStatus.PENDING:
operation.status = AsyncOperationStatus.CANCELLED
return True
return False
def get_all_operations(self) -> List[AsyncOperation]:
"""获取所有操作"""
return list(self.operations.values())
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
operations = list(self.operations.values())
status_counts = {}
for status in AsyncOperationStatus:
status_counts[status.value] = sum(1 for op in operations if op.status == status)
completed_ops = [op for op in operations if op.status == AsyncOperationStatus.COMPLETED]
avg_execution_time = 0
if completed_ops:
avg_execution_time = sum(op.end_time - op.start_time for op in completed_ops) / len(completed_ops)
return {
"total_operations": len(operations),
"status_distribution": status_counts,
"avg_execution_time": round(avg_execution_time, 4),
"success_rate": round(status_counts.get("completed", 0) / max(len(operations), 1) * 100, 2)
}
def cleanup_completed_operations(self, max_age_seconds: int = 3600):
"""清理已完成的操作"""
current_time = time.time()
to_remove = []
for op_id, operation in self.operations.items():
if (operation.status in [AsyncOperationStatus.COMPLETED, AsyncOperationStatus.FAILED, AsyncOperationStatus.CANCELLED] and
operation.end_time and (current_time - operation.end_time) > max_age_seconds):
to_remove.append(op_id)
for op_id in to_remove:
del self.operations[op_id]
def close(self):
"""关闭客户端"""
self.executor.shutdown(wait=True)
# 异步操作使用示例
async def async_example():
print("\n=== HBase异步操作示例 ===")
# 创建异步客户端
async_client = HBaseAsyncClient(None, max_workers=5)
# 定义回调函数
def operation_callback(result: AsyncResult):
print(f" 回调: 操作 {result.operation_id} {'成功' if result.success else '失败'}, 耗时 {result.execution_time:.4f}s")
print("1. 异步PUT操作:")
# 并发执行多个PUT操作
put_tasks = []
for i in range(5):
task = async_client.put_async(
table_name="async_test",
row_key=f"row_{i}",
column_family="data",
qualifier="value",
value=f"async_value_{i}",
callback=operation_callback
)
put_tasks.append(task)
# 等待所有PUT操作完成
put_operation_ids = await asyncio.gather(*put_tasks)
print(f" 启动了 {len(put_operation_ids)} 个PUT操作")
# 等待一段时间让操作完成
await asyncio.sleep(0.5)
print("\n2. 异步GET操作:")
# 并发执行多个GET操作
get_tasks = []
for i in range(3):
task = async_client.get_async(
table_name="async_test",
row_key=f"row_{i}",
column_family="data",
qualifier="value",
callback=operation_callback
)
get_tasks.append(task)
get_operation_ids = await asyncio.gather(*get_tasks)
print(f" 启动了 {len(get_operation_ids)} 个GET操作")
await asyncio.sleep(0.3)
print("\n3. 异步批量操作:")
batch_operations = [
{"type": "put", "row_key": "batch_row_1", "value": "batch_value_1"},
{"type": "put", "row_key": "batch_row_2", "value": "batch_value_2"},
{"type": "delete", "row_key": "batch_row_3"}
]
batch_operation_id = await async_client.batch_async(
operations=batch_operations,
callback=operation_callback
)
print(f" 启动批量操作: {batch_operation_id}")
await asyncio.sleep(0.2)
# 查看操作状态
print("\n4. 操作状态:")
all_operations = async_client.get_all_operations()
for operation in all_operations[-5:]: # 显示最后5个操作
print(f" {operation.operation_id}: {operation.operation_type.value} - {operation.status.value}")
# 查看统计信息
stats = async_client.get_statistics()
print(f"\n5. 异步客户端统计: {stats}")
# 清理和关闭
async_client.cleanup_completed_operations()
async_client.close()
# 运行异步示例
if __name__ == "__main__":
asyncio.run(async_example())
5.2 异步流处理
class AsyncStreamProcessor:
"""异步流处理器"""
def __init__(self, async_client: HBaseAsyncClient, batch_size: int = 100):
self.async_client = async_client
self.batch_size = batch_size
self.processing_queue = asyncio.Queue()
self.result_queue = asyncio.Queue()
self.is_running = False
self.processed_count = 0
self.error_count = 0
async def start_processing(self):
"""开始处理"""
self.is_running = True
# 启动处理任务
processing_task = asyncio.create_task(self._process_stream())
result_task = asyncio.create_task(self._handle_results())
await asyncio.gather(processing_task, result_task)
async def add_operation(self, operation_data: Dict[str, Any]):
"""添加操作到队列"""
await self.processing_queue.put(operation_data)
async def _process_stream(self):
"""处理流数据"""
batch = []
while self.is_running:
try:
# 等待操作数据
operation_data = await asyncio.wait_for(
self.processing_queue.get(),
timeout=1.0
)
batch.append(operation_data)
# 当批次满了或队列为空时处理批次
if len(batch) >= self.batch_size or self.processing_queue.empty():
if batch:
await self._process_batch(batch)
batch = []
except asyncio.TimeoutError:
# 超时时处理当前批次
if batch:
await self._process_batch(batch)
batch = []
except Exception as e:
print(f"流处理错误: {e}")
self.error_count += 1
async def _process_batch(self, batch: List[Dict[str, Any]]):
"""处理批次"""
try:
operation_id = await self.async_client.batch_async(
operations=batch,
callback=self._batch_callback
)
await self.result_queue.put({
"operation_id": operation_id,
"batch_size": len(batch),
"timestamp": time.time()
})
except Exception as e:
print(f"批次处理错误: {e}")
self.error_count += 1
def _batch_callback(self, result: AsyncResult):
"""批次回调"""
if result.success:
self.processed_count += len(result.result)
else:
self.error_count += 1
async def _handle_results(self):
"""处理结果"""
while self.is_running:
try:
result = await asyncio.wait_for(
self.result_queue.get(),
timeout=1.0
)
print(f"处理批次 {result['operation_id']}: {result['batch_size']} 个操作")
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"结果处理错误: {e}")
def stop_processing(self):
"""停止处理"""
self.is_running = False
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
"processed_count": self.processed_count,
"error_count": self.error_count,
"queue_size": self.processing_queue.qsize(),
"is_running": self.is_running
}
# 流处理示例
async def stream_processing_example():
print("\n=== 异步流处理示例 ===")
async_client = HBaseAsyncClient(None, max_workers=10)
stream_processor = AsyncStreamProcessor(async_client, batch_size=5)
# 启动流处理(在后台运行)
processing_task = asyncio.create_task(stream_processor.start_processing())
# 模拟数据流
print("1. 模拟数据流:")
for i in range(20):
operation_data = {
"type": "put",
"row_key": f"stream_row_{i}",
"value": f"stream_value_{i}"
}
await stream_processor.add_operation(operation_data)
if i % 5 == 0:
print(f" 已添加 {i+1} 个操作到流")
await asyncio.sleep(0.1) # 模拟数据到达间隔
# 等待处理完成
await asyncio.sleep(2)
# 查看统计信息
stats = stream_processor.get_statistics()
print(f"\n2. 流处理统计: {stats}")
# 停止处理
stream_processor.stop_processing()
processing_task.cancel()
async_client.close()
# 运行流处理示例
if __name__ == "__main__":
asyncio.run(stream_processing_example())
6. 性能监控与调优
6.1 性能监控
class PerformanceMetric(Enum):
"""性能指标枚举"""
LATENCY = "latency"
THROUGHPUT = "throughput"
ERROR_RATE = "error_rate"
MEMORY_USAGE = "memory_usage"
CPU_USAGE = "cpu_usage"
CONNECTION_COUNT = "connection_count"
@dataclass
class MetricData:
"""指标数据"""
metric_type: PerformanceMetric
value: float
timestamp: int
tags: Dict[str, str] = field(default_factory=dict)
class PerformanceMonitor:
"""性能监控器"""
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self.metrics: Dict[PerformanceMetric, List[MetricData]] = {}
self.operation_times: List[float] = []
self.error_count = 0
self.total_operations = 0
self._lock = threading.Lock()
def record_operation(self, operation_type: str, execution_time: float, success: bool):
"""记录操作"""
with self._lock:
self.total_operations += 1
if success:
self.operation_times.append(execution_time)
# 限制窗口大小
if len(self.operation_times) > self.window_size:
self.operation_times = self.operation_times[-self.window_size:]
else:
self.error_count += 1
# 记录延迟指标
self._add_metric(
PerformanceMetric.LATENCY,
execution_time,
{"operation_type": operation_type, "success": str(success)}
)
def _add_metric(self, metric_type: PerformanceMetric, value: float, tags: Dict[str, str] = None):
"""添加指标"""
if metric_type not in self.metrics:
self.metrics[metric_type] = []
metric_data = MetricData(
metric_type=metric_type,
value=value,
timestamp=int(time.time() * 1000),
tags=tags or {}
)
self.metrics[metric_type].append(metric_data)
# 限制指标数量
if len(self.metrics[metric_type]) > self.window_size:
self.metrics[metric_type] = self.metrics[metric_type][-self.window_size:]
def get_latency_stats(self) -> Dict[str, float]:
"""获取延迟统计"""
if not self.operation_times:
return {"avg": 0, "min": 0, "max": 0, "p50": 0, "p95": 0, "p99": 0}
sorted_times = sorted(self.operation_times)
n = len(sorted_times)
return {
"avg": round(sum(sorted_times) / n, 4),
"min": round(min(sorted_times), 4),
"max": round(max(sorted_times), 4),
"p50": round(sorted_times[int(n * 0.5)], 4),
"p95": round(sorted_times[int(n * 0.95)], 4),
"p99": round(sorted_times[int(n * 0.99)], 4)
}
def get_throughput(self, time_window_seconds: int = 60) -> float:
"""获取吞吐量(操作/秒)"""
current_time = int(time.time() * 1000)
window_start = current_time - (time_window_seconds * 1000)
recent_operations = 0
for metric_list in self.metrics.values():
recent_operations += sum(1 for m in metric_list if m.timestamp >= window_start)
return round(recent_operations / time_window_seconds, 2)
def get_error_rate(self) -> float:
"""获取错误率"""
if self.total_operations == 0:
return 0.0
return round(self.error_count / self.total_operations * 100, 2)
def get_performance_summary(self) -> Dict[str, Any]:
"""获取性能摘要"""
return {
"latency_stats": self.get_latency_stats(),
"throughput_ops_per_sec": self.get_throughput(),
"error_rate_percent": self.get_error_rate(),
"total_operations": self.total_operations,
"error_count": self.error_count
}
def reset_metrics(self):
"""重置指标"""
with self._lock:
self.metrics.clear()
self.operation_times.clear()
self.error_count = 0
self.total_operations = 0
class MonitoredHBaseClient:
"""带监控的HBase客户端"""
def __init__(self, connection):
self.connection = connection
self.monitor = PerformanceMonitor()
self.data_store = {} # 模拟数据存储
def put(self, table_name: str, row_key: str, column_family: str, qualifier: str, value: Any) -> bool:
"""PUT操作(带监控)"""
start_time = time.time()
success = True
try:
# 模拟PUT操作
time.sleep(0.01) # 模拟网络延迟
key = f"{table_name}:{row_key}:{column_family}:{qualifier}"
self.data_store[key] = {
"value": value,
"timestamp": int(time.time() * 1000)
}
except Exception as e:
success = False
print(f"PUT操作失败: {e}")
finally:
execution_time = time.time() - start_time
self.monitor.record_operation("put", execution_time, success)
return success
def get(self, table_name: str, row_key: str, column_family: str, qualifier: str) -> Optional[Any]:
"""GET操作(带监控)"""
start_time = time.time()
success = True
result = None
try:
# 模拟GET操作
time.sleep(0.005) # 模拟网络延迟
key = f"{table_name}:{row_key}:{column_family}:{qualifier}"
if key in self.data_store:
result = self.data_store[key]
except Exception as e:
success = False
print(f"GET操作失败: {e}")
finally:
execution_time = time.time() - start_time
self.monitor.record_operation("get", execution_time, success)
return result
def delete(self, table_name: str, row_key: str, column_family: str, qualifier: str) -> bool:
"""DELETE操作(带监控)"""
start_time = time.time()
success = True
try:
# 模拟DELETE操作
time.sleep(0.008) # 模拟网络延迟
key = f"{table_name}:{row_key}:{column_family}:{qualifier}"
if key in self.data_store:
del self.data_store[key]
except Exception as e:
success = False
print(f"DELETE操作失败: {e}")
finally:
execution_time = time.time() - start_time
self.monitor.record_operation("delete", execution_time, success)
return success
def get_performance_report(self) -> Dict[str, Any]:
"""获取性能报告"""
return self.monitor.get_performance_summary()
def reset_monitoring(self):
"""重置监控"""
self.monitor.reset_metrics()
# 性能监控示例
print("\n=== HBase性能监控示例 ===")
# 创建带监控的客户端
monitored_client = MonitoredHBaseClient(None)
print("1. 执行混合操作:")
# 执行一系列操作
for i in range(100):
# PUT操作
monitored_client.put("test_table", f"row_{i}", "data", "value", f"test_value_{i}")
# GET操作
if i % 3 == 0:
monitored_client.get("test_table", f"row_{i}", "data", "value")
# DELETE操作
if i % 10 == 0:
monitored_client.delete("test_table", f"row_{i-5}", "data", "value")
# 模拟一些失败的操作
if i % 20 == 0:
try:
# 故意引发错误
raise Exception("模拟错误")
except:
monitored_client.monitor.record_operation("put", 0.1, False)
print(" 完成100个操作")
# 获取性能报告
print("\n2. 性能报告:")
report = monitored_client.get_performance_report()
for key, value in report.items():
print(f" {key}: {value}")
# 获取详细的延迟统计
print("\n3. 详细延迟统计:")
latency_stats = monitored_client.monitor.get_latency_stats()
for percentile, value in latency_stats.items():
print(f" {percentile}: {value}s")
print(f"\n4. 当前吞吐量: {monitored_client.monitor.get_throughput()} 操作/秒")
print(f"5. 错误率: {monitored_client.monitor.get_error_rate()}%")
6.2 性能调优建议
class PerformanceTuner:
"""性能调优器"""
def __init__(self, monitor: PerformanceMonitor):
self.monitor = monitor
def analyze_performance(self) -> Dict[str, Any]:
"""分析性能"""
summary = self.monitor.get_performance_summary()
recommendations = []
# 分析延迟
latency_stats = summary["latency_stats"]
if latency_stats["avg"] > 0.1: # 平均延迟超过100ms
recommendations.append({
"issue": "高延迟",
"description": f"平均延迟 {latency_stats['avg']}s 过高",
"suggestions": [
"检查网络连接质量",
"增加连接池大小",
"考虑使用批量操作",
"优化行键设计以减少热点"
]
})
# 分析吞吐量
throughput = summary["throughput_ops_per_sec"]
if throughput < 100: # 吞吐量低于100 ops/sec
recommendations.append({
"issue": "低吞吐量",
"description": f"吞吐量 {throughput} ops/sec 过低",
"suggestions": [
"增加并发连接数",
"使用异步操作",
"优化批量操作大小",
"检查RegionServer负载"
]
})
# 分析错误率
error_rate = summary["error_rate_percent"]
if error_rate > 5: # 错误率超过5%
recommendations.append({
"issue": "高错误率",
"description": f"错误率 {error_rate}% 过高",
"suggestions": [
"检查HBase集群状态",
"增加重试机制",
"检查网络稳定性",
"优化超时设置"
]
})
# 分析延迟分布
if latency_stats["p99"] > latency_stats["avg"] * 5:
recommendations.append({
"issue": "延迟分布不均",
"description": "P99延迟远高于平均延迟,存在性能抖动",
"suggestions": [
"检查GC设置",
"优化RegionServer配置",
"检查磁盘I/O性能",
"考虑预分区策略"
]
})
return {
"performance_summary": summary,
"recommendations": recommendations,
"overall_health": self._calculate_health_score(summary)
}
def _calculate_health_score(self, summary: Dict[str, Any]) -> Dict[str, Any]:
"""计算健康评分"""
score = 100
# 延迟评分
avg_latency = summary["latency_stats"]["avg"]
if avg_latency > 0.1:
score -= 20
elif avg_latency > 0.05:
score -= 10
# 吞吐量评分
throughput = summary["throughput_ops_per_sec"]
if throughput < 50:
score -= 30
elif throughput < 100:
score -= 15
# 错误率评分
error_rate = summary["error_rate_percent"]
if error_rate > 10:
score -= 40
elif error_rate > 5:
score -= 20
elif error_rate > 1:
score -= 10
# 确定健康等级
if score >= 90:
health_level = "优秀"
elif score >= 70:
health_level = "良好"
elif score >= 50:
health_level = "一般"
else:
health_level = "需要优化"
return {
"score": max(0, score),
"level": health_level
}
def generate_optimization_plan(self, analysis: Dict[str, Any]) -> List[Dict[str, Any]]:
"""生成优化计划"""
recommendations = analysis["recommendations"]
health_score = analysis["overall_health"]["score"]
optimization_plan = []
# 根据健康评分确定优化优先级
if health_score < 50:
priority = "高"
elif health_score < 70:
priority = "中"
else:
priority = "低"
for i, rec in enumerate(recommendations):
plan_item = {
"priority": priority,
"order": i + 1,
"issue": rec["issue"],
"description": rec["description"],
"action_items": rec["suggestions"],
"estimated_impact": self._estimate_impact(rec["issue"]),
"implementation_difficulty": self._estimate_difficulty(rec["issue"])
}
optimization_plan.append(plan_item)
return optimization_plan
def _estimate_impact(self, issue: str) -> str:
"""估算影响程度"""
impact_map = {
"高延迟": "中等",
"低吞吐量": "高",
"高错误率": "高",
"延迟分布不均": "中等"
}
return impact_map.get(issue, "未知")
def _estimate_difficulty(self, issue: str) -> str:
"""估算实施难度"""
difficulty_map = {
"高延迟": "中等",
"低吞吐量": "中等",
"高错误率": "低",
"延迟分布不均": "高"
}
return difficulty_map.get(issue, "未知")
# 性能调优示例
print("\n=== 性能调优分析示例 ===")
# 使用之前的监控数据
tuner = PerformanceTuner(monitored_client.monitor)
# 分析性能
analysis = tuner.analyze_performance()
print("1. 性能分析结果:")
print(f" 健康评分: {analysis['overall_health']['score']} ({analysis['overall_health']['level']})")
print("\n2. 发现的问题:")
for rec in analysis["recommendations"]:
print(f" 问题: {rec['issue']}")
print(f" 描述: {rec['description']}")
print(f" 建议: {', '.join(rec['suggestions'])}")
print()
# 生成优化计划
optimization_plan = tuner.generate_optimization_plan(analysis)
print("3. 优化计划:")
for plan_item in optimization_plan:
print(f" 优先级: {plan_item['priority']} | 顺序: {plan_item['order']}")
print(f" 问题: {plan_item['issue']}")
print(f" 预期影响: {plan_item['estimated_impact']} | 实施难度: {plan_item['implementation_difficulty']}")
print(f" 行动项: {', '.join(plan_item['action_items'])}")
print()
总结
本章详细介绍了HBase的高级操作与客户端API,包括:
关键要点
- 批量操作:通过批量处理提高操作效率,支持多种执行模式
- 高级过滤器:实现复杂的数据筛选和查询逻辑
- 计数器操作:提供原子性的计数功能,支持分布式计数
- 协处理器:实现服务端数据处理,包括审计、安全和验证
- 异步操作:通过异步编程提高并发性能
- 性能监控:实时监控系统性能,及时发现问题
最佳实践
- 合理使用批量操作:根据数据量和网络条件选择合适的批次大小
- 优化过滤器使用:在服务端过滤数据,减少网络传输
- 谨慎使用协处理器:避免在协处理器中执行耗时操作
- 监控关键指标:持续监控延迟、吞吐量和错误率
- 异步处理大量数据:使用异步操作处理高并发场景
下一步学习
- HBase集群管理与运维
- HBase与其他大数据组件集成
- HBase性能调优深入实践
- HBase故障排除与恢复 “`