1. HBase与Hadoop生态系统集成
1.1 与HDFS集成
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Set
from datetime import datetime, timedelta
import threading
import time
import random
import json
class HDFSOperationType(Enum):
"""HDFS操作类型枚举"""
READ = "read"
WRITE = "write"
DELETE = "delete"
LIST = "list"
MKDIR = "mkdir"
COPY = "copy"
MOVE = "move"
class HDFSFileType(Enum):
"""HDFS文件类型枚举"""
HFILE = "hfile"
WAL = "wal"
SNAPSHOT = "snapshot"
COMPACTION = "compaction"
REGION_INFO = "region_info"
TABLE_INFO = "table_info"
@dataclass
class HDFSPath:
"""HDFS路径数据类"""
path: str
file_type: HDFSFileType
size_bytes: int = 0
replication_factor: int = 3
block_size: int = 134217728 # 128MB
owner: str = "hbase"
group: str = "hbase"
permissions: str = "755"
created_time: datetime = field(default_factory=datetime.now)
modified_time: datetime = field(default_factory=datetime.now)
accessed_time: datetime = field(default_factory=datetime.now)
@dataclass
class HDFSOperation:
"""HDFS操作数据类"""
operation_id: str
operation_type: HDFSOperationType
source_path: str
target_path: Optional[str] = None
file_type: Optional[HDFSFileType] = None
status: str = "pending" # pending, running, completed, failed
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
bytes_processed: int = 0
error_message: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
class HBaseHDFSIntegration:
"""HBase与HDFS集成管理器"""
def __init__(self, hdfs_root: str = "/hbase"):
self.hdfs_root = hdfs_root
self.file_system: Dict[str, HDFSPath] = {}
self.operations: Dict[str, HDFSOperation] = {}
self.operation_history: List[Dict[str, Any]] = []
self.replication_jobs: Dict[str, Dict[str, Any]] = {}
self._lock = threading.Lock()
# 初始化HBase目录结构
self._initialize_hbase_directories()
def _initialize_hbase_directories(self):
"""初始化HBase目录结构"""
base_dirs = [
"/hbase",
"/hbase/data",
"/hbase/WALs",
"/hbase/oldWALs",
"/hbase/archive",
"/hbase/staging",
"/hbase/corrupt",
"/hbase/.tmp",
"/hbase/MasterProcWALs"
]
for dir_path in base_dirs:
self.file_system[dir_path] = HDFSPath(
path=dir_path,
file_type=HDFSFileType.TABLE_INFO,
size_bytes=0,
permissions="755"
)
print(f"HBase HDFS目录结构初始化完成: {len(base_dirs)} 个目录")
def create_table_directory(self, namespace: str, table_name: str) -> str:
"""创建表目录"""
table_path = f"{self.hdfs_root}/data/{namespace}/{table_name}"
# 创建表目录
self.file_system[table_path] = HDFSPath(
path=table_path,
file_type=HDFSFileType.TABLE_INFO,
permissions="755"
)
# 创建表信息文件
table_info_path = f"{table_path}/.tableinfo"
self.file_system[table_info_path] = HDFSPath(
path=table_info_path,
file_type=HDFSFileType.TABLE_INFO,
size_bytes=1024,
permissions="644"
)
print(f"表目录已创建: {table_path}")
return table_path
def create_region_directory(self, table_path: str, region_name: str) -> str:
"""创建Region目录"""
region_path = f"{table_path}/{region_name}"
# 创建Region目录
self.file_system[region_path] = HDFSPath(
path=region_path,
file_type=HDFSFileType.REGION_INFO,
permissions="755"
)
# 创建列族目录(示例)
cf_dirs = ["cf1", "cf2"]
for cf in cf_dirs:
cf_path = f"{region_path}/{cf}"
self.file_system[cf_path] = HDFSPath(
path=cf_path,
file_type=HDFSFileType.HFILE,
permissions="755"
)
print(f"Region目录已创建: {region_path}")
return region_path
def write_hfile(self, region_path: str, column_family: str,
hfile_name: str, size_bytes: int) -> str:
"""写入HFile"""
hfile_path = f"{region_path}/{column_family}/{hfile_name}"
operation_id = f"write_{int(time.time())}_{random.randint(1000, 9999)}"
operation = HDFSOperation(
operation_id=operation_id,
operation_type=HDFSOperationType.WRITE,
source_path=hfile_path,
file_type=HDFSFileType.HFILE,
start_time=datetime.now(),
bytes_processed=size_bytes
)
try:
# 模拟写入操作
time.sleep(0.1) # 模拟I/O延迟
# 创建HFile
self.file_system[hfile_path] = HDFSPath(
path=hfile_path,
file_type=HDFSFileType.HFILE,
size_bytes=size_bytes,
permissions="644"
)
operation.status = "completed"
operation.end_time = datetime.now()
print(f"HFile写入成功: {hfile_path} ({size_bytes} bytes)")
except Exception as e:
operation.status = "failed"
operation.error_message = str(e)
operation.end_time = datetime.now()
print(f"HFile写入失败: {e}")
with self._lock:
self.operations[operation_id] = operation
self._record_operation_history(operation)
return hfile_path
def write_wal(self, region_server: str, wal_name: str, entries: List[Dict[str, Any]]) -> str:
"""写入WAL"""
wal_path = f"{self.hdfs_root}/WALs/{region_server}/{wal_name}"
operation_id = f"wal_{int(time.time())}_{random.randint(1000, 9999)}"
wal_size = len(entries) * 100 # 估算WAL大小
operation = HDFSOperation(
operation_id=operation_id,
operation_type=HDFSOperationType.WRITE,
source_path=wal_path,
file_type=HDFSFileType.WAL,
start_time=datetime.now(),
bytes_processed=wal_size,
metadata={"entries_count": len(entries)}
)
try:
# 模拟WAL写入
time.sleep(0.05)
# 创建或更新WAL文件
if wal_path in self.file_system:
self.file_system[wal_path].size_bytes += wal_size
self.file_system[wal_path].modified_time = datetime.now()
else:
self.file_system[wal_path] = HDFSPath(
path=wal_path,
file_type=HDFSFileType.WAL,
size_bytes=wal_size,
permissions="644"
)
operation.status = "completed"
operation.end_time = datetime.now()
print(f"WAL写入成功: {wal_path} ({len(entries)} entries)")
except Exception as e:
operation.status = "failed"
operation.error_message = str(e)
operation.end_time = datetime.now()
print(f"WAL写入失败: {e}")
with self._lock:
self.operations[operation_id] = operation
self._record_operation_history(operation)
return wal_path
def compact_hfiles(self, region_path: str, column_family: str,
input_files: List[str], output_file: str) -> str:
"""压缩HFiles"""
output_path = f"{region_path}/{column_family}/{output_file}"
operation_id = f"compact_{int(time.time())}_{random.randint(1000, 9999)}"
# 计算输入文件总大小
total_input_size = 0
for input_file in input_files:
input_path = f"{region_path}/{column_family}/{input_file}"
if input_path in self.file_system:
total_input_size += self.file_system[input_path].size_bytes
# 压缩后大小通常会减少
output_size = int(total_input_size * 0.8)
operation = HDFSOperation(
operation_id=operation_id,
operation_type=HDFSOperationType.WRITE,
source_path=output_path,
file_type=HDFSFileType.COMPACTION,
start_time=datetime.now(),
bytes_processed=output_size,
metadata={
"input_files": input_files,
"input_size": total_input_size,
"compression_ratio": 0.8
}
)
try:
# 模拟压缩操作
time.sleep(0.2) # 压缩需要更多时间
# 创建压缩后的文件
self.file_system[output_path] = HDFSPath(
path=output_path,
file_type=HDFSFileType.HFILE,
size_bytes=output_size,
permissions="644"
)
# 删除输入文件
for input_file in input_files:
input_path = f"{region_path}/{column_family}/{input_file}"
if input_path in self.file_system:
del self.file_system[input_path]
operation.status = "completed"
operation.end_time = datetime.now()
print(f"HFile压缩完成: {output_path} (压缩比: 80%)")
except Exception as e:
operation.status = "failed"
operation.error_message = str(e)
operation.end_time = datetime.now()
print(f"HFile压缩失败: {e}")
with self._lock:
self.operations[operation_id] = operation
self._record_operation_history(operation)
return output_path
def create_snapshot(self, table_path: str, snapshot_name: str) -> str:
"""创建快照"""
snapshot_path = f"{self.hdfs_root}/archive/data/{snapshot_name}"
operation_id = f"snapshot_{int(time.time())}_{random.randint(1000, 9999)}"
# 计算表大小
table_size = 0
for path, hdfs_path in self.file_system.items():
if path.startswith(table_path) and hdfs_path.file_type == HDFSFileType.HFILE:
table_size += hdfs_path.size_bytes
operation = HDFSOperation(
operation_id=operation_id,
operation_type=HDFSOperationType.COPY,
source_path=table_path,
target_path=snapshot_path,
file_type=HDFSFileType.SNAPSHOT,
start_time=datetime.now(),
bytes_processed=table_size,
metadata={"snapshot_name": snapshot_name}
)
try:
# 模拟快照创建
time.sleep(0.3)
# 创建快照目录
self.file_system[snapshot_path] = HDFSPath(
path=snapshot_path,
file_type=HDFSFileType.SNAPSHOT,
size_bytes=table_size,
permissions="755"
)
operation.status = "completed"
operation.end_time = datetime.now()
print(f"快照创建成功: {snapshot_path} ({table_size} bytes)")
except Exception as e:
operation.status = "failed"
operation.error_message = str(e)
operation.end_time = datetime.now()
print(f"快照创建失败: {e}")
with self._lock:
self.operations[operation_id] = operation
self._record_operation_history(operation)
return snapshot_path
def archive_old_wals(self, region_server: str, max_age_hours: int = 24) -> List[str]:
"""归档旧WAL文件"""
archived_files = []
cutoff_time = datetime.now() - timedelta(hours=max_age_hours)
wal_dir = f"{self.hdfs_root}/WALs/{region_server}"
old_wal_dir = f"{self.hdfs_root}/oldWALs/{region_server}"
# 查找需要归档的WAL文件
for path, hdfs_path in list(self.file_system.items()):
if (path.startswith(wal_dir) and
hdfs_path.file_type == HDFSFileType.WAL and
hdfs_path.modified_time < cutoff_time):
# 移动到oldWALs目录
old_path = path.replace(wal_dir, old_wal_dir)
# 创建目标目录(如果不存在)
if old_wal_dir not in self.file_system:
self.file_system[old_wal_dir] = HDFSPath(
path=old_wal_dir,
file_type=HDFSFileType.WAL,
permissions="755"
)
# 移动文件
self.file_system[old_path] = hdfs_path
self.file_system[old_path].path = old_path
del self.file_system[path]
archived_files.append(old_path)
print(f"WAL归档完成: {len(archived_files)} 个文件")
return archived_files
def _record_operation_history(self, operation: HDFSOperation):
"""记录操作历史"""
duration = 0
if operation.start_time and operation.end_time:
duration = (operation.end_time - operation.start_time).total_seconds()
self.operation_history.append({
"operation_id": operation.operation_id,
"operation_type": operation.operation_type.value,
"file_type": operation.file_type.value if operation.file_type else None,
"status": operation.status,
"duration_seconds": duration,
"bytes_processed": operation.bytes_processed,
"timestamp": operation.start_time.isoformat() if operation.start_time else None
})
# 保留最近1000条记录
if len(self.operation_history) > 1000:
self.operation_history = self.operation_history[-1000:]
def get_storage_statistics(self) -> Dict[str, Any]:
"""获取存储统计信息"""
stats = {
"total_files": 0,
"total_size_bytes": 0,
"file_type_distribution": {},
"directory_sizes": {},
"replication_info": {
"total_replicated_size": 0,
"average_replication_factor": 0
}
}
total_replication_factor = 0
file_count = 0
for path, hdfs_path in self.file_system.items():
stats["total_files"] += 1
stats["total_size_bytes"] += hdfs_path.size_bytes
# 按文件类型统计
file_type = hdfs_path.file_type.value
if file_type not in stats["file_type_distribution"]:
stats["file_type_distribution"][file_type] = {"count": 0, "size_bytes": 0}
stats["file_type_distribution"][file_type]["count"] += 1
stats["file_type_distribution"][file_type]["size_bytes"] += hdfs_path.size_bytes
# 按目录统计
dir_path = "/".join(path.split("/")[:-1]) if "/" in path else "/"
if dir_path not in stats["directory_sizes"]:
stats["directory_sizes"][dir_path] = 0
stats["directory_sizes"][dir_path] += hdfs_path.size_bytes
# 复制因子统计
if hdfs_path.size_bytes > 0: # 只统计实际文件
total_replication_factor += hdfs_path.replication_factor
file_count += 1
stats["replication_info"]["total_replicated_size"] += (
hdfs_path.size_bytes * hdfs_path.replication_factor
)
if file_count > 0:
stats["replication_info"]["average_replication_factor"] = (
total_replication_factor / file_count
)
return stats
def get_operation_statistics(self) -> Dict[str, Any]:
"""获取操作统计信息"""
if not self.operation_history:
return {"total_operations": 0}
stats = {
"total_operations": len(self.operation_history),
"operation_type_distribution": {},
"status_distribution": {},
"performance_metrics": {
"average_duration": 0,
"total_bytes_processed": 0,
"average_throughput_mbps": 0
},
"recent_operations": self.operation_history[-10:]
}
total_duration = 0
total_bytes = 0
for op in self.operation_history:
# 按操作类型统计
op_type = op["operation_type"]
if op_type not in stats["operation_type_distribution"]:
stats["operation_type_distribution"][op_type] = 0
stats["operation_type_distribution"][op_type] += 1
# 按状态统计
status = op["status"]
if status not in stats["status_distribution"]:
stats["status_distribution"][status] = 0
stats["status_distribution"][status] += 1
# 性能统计
total_duration += op["duration_seconds"]
total_bytes += op["bytes_processed"]
if len(self.operation_history) > 0:
stats["performance_metrics"]["average_duration"] = (
total_duration / len(self.operation_history)
)
stats["performance_metrics"]["total_bytes_processed"] = total_bytes
if total_duration > 0:
throughput_bps = total_bytes / total_duration
stats["performance_metrics"]["average_throughput_mbps"] = (
throughput_bps / (1024 * 1024)
)
return stats
def cleanup_old_files(self, max_age_days: int = 30) -> List[str]:
"""清理旧文件"""
cleaned_files = []
cutoff_time = datetime.now() - timedelta(days=max_age_days)
# 清理oldWALs目录中的旧文件
old_wal_prefix = f"{self.hdfs_root}/oldWALs"
for path, hdfs_path in list(self.file_system.items()):
if (path.startswith(old_wal_prefix) and
hdfs_path.file_type == HDFSFileType.WAL and
hdfs_path.modified_time < cutoff_time):
del self.file_system[path]
cleaned_files.append(path)
print(f"文件清理完成: 删除了 {len(cleaned_files)} 个旧文件")
return cleaned_files
# HBase与HDFS集成示例
print("\n=== HBase与HDFS集成示例 ===")
# 创建HDFS集成管理器
hdfs_integration = HBaseHDFSIntegration()
print("1. 创建表和Region目录:")
table_path = hdfs_integration.create_table_directory("default", "user_table")
region_path = hdfs_integration.create_region_directory(table_path, "region_001")
print("\n2. 写入HFile:")
for i in range(3):
hfile_name = f"hfile_{i:03d}"
size = random.randint(10000, 50000)
hdfs_integration.write_hfile(region_path, "cf1", hfile_name, size)
print("\n3. 写入WAL:")
wal_entries = [
{"row": "user001", "cf": "cf1", "qualifier": "name", "value": "Alice"},
{"row": "user002", "cf": "cf1", "qualifier": "name", "value": "Bob"},
{"row": "user003", "cf": "cf1", "qualifier": "name", "value": "Charlie"}
]
hdfs_integration.write_wal("rs001", "wal_001.log", wal_entries)
print("\n4. 压缩HFiles:")
input_files = ["hfile_000", "hfile_001"]
hdfs_integration.compact_hfiles(region_path, "cf1", input_files, "compacted_001")
print("\n5. 创建快照:")
hdfs_integration.create_snapshot(table_path, "user_table_snapshot_20240101")
print("\n6. 归档WAL文件:")
archived = hdfs_integration.archive_old_wals("rs001", max_age_hours=1)
print("\n7. 存储统计:")
storage_stats = hdfs_integration.get_storage_statistics()
print(f" 总文件数: {storage_stats['total_files']}")
print(f" 总大小: {storage_stats['total_size_bytes']:,} bytes")
print(f" 平均复制因子: {storage_stats['replication_info']['average_replication_factor']:.1f}")
print("\n 文件类型分布:")
for file_type, info in storage_stats['file_type_distribution'].items():
print(f" {file_type}: {info['count']} 文件, {info['size_bytes']:,} bytes")
print("\n8. 操作统计:")
op_stats = hdfs_integration.get_operation_statistics()
print(f" 总操作数: {op_stats['total_operations']}")
print(f" 平均耗时: {op_stats['performance_metrics']['average_duration']:.3f} 秒")
print(f" 平均吞吐量: {op_stats['performance_metrics']['average_throughput_mbps']:.2f} MB/s")
print("\n 操作类型分布:")
for op_type, count in op_stats['operation_type_distribution'].items():
print(f" {op_type}: {count} 次")
1.2 与MapReduce集成
class MapReduceJobType(Enum):
"""MapReduce作业类型枚举"""
BULK_LOAD = "bulk_load"
EXPORT = "export"
IMPORT = "import"
COMPACTION = "compaction"
BACKUP = "backup"
ANALYSIS = "analysis"
ETL = "etl"
class JobStatus(Enum):
"""作业状态枚举"""
SUBMITTED = "submitted"
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
KILLED = "killed"
@dataclass
class MapReduceJob:
"""MapReduce作业数据类"""
job_id: str
job_name: str
job_type: MapReduceJobType
input_path: str
output_path: str
status: JobStatus = JobStatus.SUBMITTED
submit_time: datetime = field(default_factory=datetime.now)
start_time: Optional[datetime] = None
finish_time: Optional[datetime] = None
map_tasks: int = 0
reduce_tasks: int = 0
map_progress: float = 0.0
reduce_progress: float = 0.0
bytes_read: int = 0
bytes_written: int = 0
records_read: int = 0
records_written: int = 0
error_message: Optional[str] = None
configuration: Dict[str, str] = field(default_factory=dict)
class HBaseMapReduceIntegration:
"""HBase与MapReduce集成管理器"""
def __init__(self, hdfs_integration: HBaseHDFSIntegration):
self.hdfs_integration = hdfs_integration
self.jobs: Dict[str, MapReduceJob] = {}
self.job_history: List[Dict[str, Any]] = []
self.running_jobs: Set[str] = set()
self._lock = threading.Lock()
def submit_bulk_load_job(self, input_path: str, table_name: str,
column_family: str) -> str:
"""提交批量加载作业"""
job_id = f"bulkload_{int(time.time())}_{random.randint(1000, 9999)}"
output_path = f"/tmp/hbase_bulkload/{job_id}"
job = MapReduceJob(
job_id=job_id,
job_name=f"BulkLoad-{table_name}",
job_type=MapReduceJobType.BULK_LOAD,
input_path=input_path,
output_path=output_path,
map_tasks=4,
reduce_tasks=2,
configuration={
"hbase.table.name": table_name,
"hbase.column.family": column_family,
"mapreduce.job.reduces": "2",
"mapreduce.map.memory.mb": "2048",
"mapreduce.reduce.memory.mb": "4096"
}
)
with self._lock:
self.jobs[job_id] = job
# 启动作业执行线程
job_thread = threading.Thread(target=self._execute_bulk_load_job, args=(job_id,))
job_thread.daemon = True
job_thread.start()
print(f"批量加载作业已提交: {job_id}")
return job_id
def _execute_bulk_load_job(self, job_id: str):
"""执行批量加载作业"""
job = self.jobs[job_id]
try:
with self._lock:
self.running_jobs.add(job_id)
job.status = JobStatus.RUNNING
job.start_time = datetime.now()
print(f"开始执行批量加载作业: {job_id}")
# 模拟Map阶段
for progress in range(0, 101, 10):
job.map_progress = progress
time.sleep(0.1)
if progress == 100:
print(f" Map阶段完成: {job_id}")
# 模拟Reduce阶段
for progress in range(0, 101, 20):
job.reduce_progress = progress
time.sleep(0.1)
if progress == 100:
print(f" Reduce阶段完成: {job_id}")
# 模拟生成HFiles
table_path = f"/hbase/data/default/{job.configuration['hbase.table.name']}"
region_path = f"{table_path}/region_001"
cf = job.configuration['hbase.column.family']
# 生成多个HFile
total_records = random.randint(10000, 50000)
hfile_count = 3
for i in range(hfile_count):
hfile_name = f"bulkload_{job_id}_{i:03d}"
hfile_size = random.randint(1000000, 5000000) # 1-5MB
self.hdfs_integration.write_hfile(region_path, cf, hfile_name, hfile_size)
# 更新作业统计
job.records_read = total_records
job.records_written = total_records
job.bytes_read = total_records * 100 # 估算
job.bytes_written = sum(random.randint(1000000, 5000000) for _ in range(hfile_count))
job.status = JobStatus.SUCCEEDED
job.finish_time = datetime.now()
print(f"批量加载作业完成: {job_id} (处理 {total_records:,} 条记录)")
except Exception as e:
job.status = JobStatus.FAILED
job.error_message = str(e)
job.finish_time = datetime.now()
print(f"批量加载作业失败: {job_id} - {e}")
finally:
with self._lock:
self.running_jobs.discard(job_id)
self._record_job_history(job)
def submit_export_job(self, table_name: str, output_path: str,
start_row: str = None, end_row: str = None) -> str:
"""提交导出作业"""
job_id = f"export_{int(time.time())}_{random.randint(1000, 9999)}"
job = MapReduceJob(
job_id=job_id,
job_name=f"Export-{table_name}",
job_type=MapReduceJobType.EXPORT,
input_path=f"/hbase/data/default/{table_name}",
output_path=output_path,
map_tasks=6,
reduce_tasks=0, # Map-only作业
configuration={
"hbase.table.name": table_name,
"hbase.export.startrow": start_row or "",
"hbase.export.endrow": end_row or "",
"mapreduce.job.reduces": "0",
"mapreduce.map.memory.mb": "1024"
}
)
with self._lock:
self.jobs[job_id] = job
# 启动作业执行线程
job_thread = threading.Thread(target=self._execute_export_job, args=(job_id,))
job_thread.daemon = True
job_thread.start()
print(f"导出作业已提交: {job_id}")
return job_id
def _execute_export_job(self, job_id: str):
"""执行导出作业"""
job = self.jobs[job_id]
try:
with self._lock:
self.running_jobs.add(job_id)
job.status = JobStatus.RUNNING
job.start_time = datetime.now()
print(f"开始执行导出作业: {job_id}")
# 模拟Map-only作业
for progress in range(0, 101, 5):
job.map_progress = progress
time.sleep(0.05)
# 模拟导出数据
exported_records = random.randint(50000, 200000)
exported_size = exported_records * 150 # 估算每条记录150字节
# 创建导出文件
export_file = f"{job.output_path}/part-m-00000"
self.hdfs_integration.file_system[export_file] = HDFSPath(
path=export_file,
file_type=HDFSFileType.TABLE_INFO,
size_bytes=exported_size,
permissions="644"
)
# 更新作业统计
job.records_read = exported_records
job.records_written = exported_records
job.bytes_read = exported_records * 100
job.bytes_written = exported_size
job.status = JobStatus.SUCCEEDED
job.finish_time = datetime.now()
print(f"导出作业完成: {job_id} (导出 {exported_records:,} 条记录)")
except Exception as e:
job.status = JobStatus.FAILED
job.error_message = str(e)
job.finish_time = datetime.now()
print(f"导出作业失败: {job_id} - {e}")
finally:
with self._lock:
self.running_jobs.discard(job_id)
self._record_job_history(job)
def submit_analysis_job(self, table_name: str, analysis_type: str,
output_path: str) -> str:
"""提交分析作业"""
job_id = f"analysis_{int(time.time())}_{random.randint(1000, 9999)}"
job = MapReduceJob(
job_id=job_id,
job_name=f"Analysis-{table_name}-{analysis_type}",
job_type=MapReduceJobType.ANALYSIS,
input_path=f"/hbase/data/default/{table_name}",
output_path=output_path,
map_tasks=8,
reduce_tasks=4,
configuration={
"hbase.table.name": table_name,
"analysis.type": analysis_type,
"mapreduce.job.reduces": "4",
"mapreduce.map.memory.mb": "2048",
"mapreduce.reduce.memory.mb": "4096"
}
)
with self._lock:
self.jobs[job_id] = job
# 启动作业执行线程
job_thread = threading.Thread(target=self._execute_analysis_job, args=(job_id,))
job_thread.daemon = True
job_thread.start()
print(f"分析作业已提交: {job_id}")
return job_id
def _execute_analysis_job(self, job_id: str):
"""执行分析作业"""
job = self.jobs[job_id]
try:
with self._lock:
self.running_jobs.add(job_id)
job.status = JobStatus.RUNNING
job.start_time = datetime.now()
print(f"开始执行分析作业: {job_id}")
# 模拟Map阶段
for progress in range(0, 101, 8):
job.map_progress = progress
time.sleep(0.1)
# 模拟Reduce阶段
for progress in range(0, 101, 12):
job.reduce_progress = progress
time.sleep(0.15)
# 模拟分析结果
analysis_type = job.configuration['analysis.type']
processed_records = random.randint(100000, 500000)
if analysis_type == "count":
result_data = {"total_count": processed_records}
elif analysis_type == "aggregation":
result_data = {
"sum": random.randint(1000000, 10000000),
"avg": random.uniform(100, 1000),
"min": random.randint(1, 100),
"max": random.randint(1000, 10000)
}
else:
result_data = {"processed_records": processed_records}
# 创建结果文件
result_file = f"{job.output_path}/part-r-00000"
result_size = len(json.dumps(result_data).encode())
self.hdfs_integration.file_system[result_file] = HDFSPath(
path=result_file,
file_type=HDFSFileType.TABLE_INFO,
size_bytes=result_size,
permissions="644"
)
# 更新作业统计
job.records_read = processed_records
job.records_written = len(result_data)
job.bytes_read = processed_records * 100
job.bytes_written = result_size
job.status = JobStatus.SUCCEEDED
job.finish_time = datetime.now()
print(f"分析作业完成: {job_id} (分析 {processed_records:,} 条记录)")
print(f" 分析结果: {result_data}")
except Exception as e:
job.status = JobStatus.FAILED
job.error_message = str(e)
job.finish_time = datetime.now()
print(f"分析作业失败: {job_id} - {e}")
finally:
with self._lock:
self.running_jobs.discard(job_id)
self._record_job_history(job)
def kill_job(self, job_id: str) -> bool:
"""终止作业"""
if job_id not in self.jobs:
return False
job = self.jobs[job_id]
if job.status == JobStatus.RUNNING:
job.status = JobStatus.KILLED
job.finish_time = datetime.now()
with self._lock:
self.running_jobs.discard(job_id)
self._record_job_history(job)
print(f"作业已终止: {job_id}")
return True
return False
def get_job_status(self, job_id: str) -> Optional[Dict[str, Any]]:
"""获取作业状态"""
job = self.jobs.get(job_id)
if not job:
return None
duration = 0
if job.start_time:
end_time = job.finish_time or datetime.now()
duration = (end_time - job.start_time).total_seconds()
return {
"job_id": job.job_id,
"job_name": job.job_name,
"job_type": job.job_type.value,
"status": job.status.value,
"submit_time": job.submit_time.isoformat(),
"start_time": job.start_time.isoformat() if job.start_time else None,
"finish_time": job.finish_time.isoformat() if job.finish_time else None,
"duration_seconds": duration,
"map_progress": job.map_progress,
"reduce_progress": job.reduce_progress,
"map_tasks": job.map_tasks,
"reduce_tasks": job.reduce_tasks,
"records_read": job.records_read,
"records_written": job.records_written,
"bytes_read": job.bytes_read,
"bytes_written": job.bytes_written,
"error_message": job.error_message
}
def list_jobs(self, status: JobStatus = None, job_type: MapReduceJobType = None) -> List[Dict[str, Any]]:
"""列出作业"""
jobs = []
for job in self.jobs.values():
if status and job.status != status:
continue
if job_type and job.job_type != job_type:
continue
job_info = self.get_job_status(job.job_id)
if job_info:
jobs.append(job_info)
return sorted(jobs, key=lambda x: x["submit_time"], reverse=True)
def _record_job_history(self, job: MapReduceJob):
"""记录作业历史"""
duration = 0
if job.start_time and job.finish_time:
duration = (job.finish_time - job.start_time).total_seconds()
self.job_history.append({
"job_id": job.job_id,
"job_name": job.job_name,
"job_type": job.job_type.value,
"status": job.status.value,
"duration_seconds": duration,
"records_processed": job.records_read,
"submit_time": job.submit_time.isoformat()
})
# 保留最近100条记录
if len(self.job_history) > 100:
self.job_history = self.job_history[-100:]
def get_job_statistics(self) -> Dict[str, Any]:
"""获取作业统计信息"""
if not self.job_history:
return {"total_jobs": 0}
stats = {
"total_jobs": len(self.job_history),
"running_jobs": len(self.running_jobs),
"job_type_distribution": {},
"status_distribution": {},
"performance_metrics": {
"average_duration": 0,
"total_records_processed": 0,
"success_rate": 0
}
}
total_duration = 0
total_records = 0
successful_jobs = 0
for job_record in self.job_history:
# 按作业类型统计
job_type = job_record["job_type"]
if job_type not in stats["job_type_distribution"]:
stats["job_type_distribution"][job_type] = 0
stats["job_type_distribution"][job_type] += 1
# 按状态统计
status = job_record["status"]
if status not in stats["status_distribution"]:
stats["status_distribution"][status] = 0
stats["status_distribution"][status] += 1
# 性能统计
total_duration += job_record["duration_seconds"]
total_records += job_record["records_processed"]
if status == "succeeded":
successful_jobs += 1
if len(self.job_history) > 0:
stats["performance_metrics"]["average_duration"] = (
total_duration / len(self.job_history)
)
stats["performance_metrics"]["total_records_processed"] = total_records
stats["performance_metrics"]["success_rate"] = (
successful_jobs / len(self.job_history)
)
return stats
# HBase与MapReduce集成示例
print("\n=== HBase与MapReduce集成示例 ===")
# 创建MapReduce集成管理器
mr_integration = HBaseMapReduceIntegration(hdfs_integration)
print("1. 提交批量加载作业:")
bulk_job_id = mr_integration.submit_bulk_load_job(
input_path="/data/input/users.csv",
table_name="user_table",
column_family="cf1"
)
print("\n2. 提交导出作业:")
export_job_id = mr_integration.submit_export_job(
table_name="user_table",
output_path="/data/export/users",
start_row="user001",
end_row="user999"
)
print("\n3. 提交分析作业:")
analysis_job_id = mr_integration.submit_analysis_job(
table_name="user_table",
analysis_type="count",
output_path="/data/analysis/user_count"
)
# 等待作业完成
time.sleep(3)
print("\n4. 检查作业状态:")
for job_id in [bulk_job_id, export_job_id, analysis_job_id]:
status = mr_integration.get_job_status(job_id)
if status:
print(f" {status['job_name']}:")
print(f" 状态: {status['status']}")
print(f" 耗时: {status['duration_seconds']:.1f} 秒")
print(f" 处理记录: {status['records_read']:,}")
if status['map_tasks'] > 0:
print(f" Map进度: {status['map_progress']:.0f}%")
if status['reduce_tasks'] > 0:
print(f" Reduce进度: {status['reduce_progress']:.0f}%")
print()
print("\n5. 作业统计:")
job_stats = mr_integration.get_job_statistics()
print(f" 总作业数: {job_stats['total_jobs']}")
print(f" 运行中作业: {job_stats['running_jobs']}")
print(f" 成功率: {job_stats['performance_metrics']['success_rate']:.1%}")
print(f" 平均耗时: {job_stats['performance_metrics']['average_duration']:.1f} 秒")
print(f" 总处理记录: {job_stats['performance_metrics']['total_records_processed']:,}")
print("\n 作业类型分布:")
for job_type, count in job_stats['job_type_distribution'].items():
print(f" {job_type}: {count} 个")
print("\n 状态分布:")
for status, count in job_stats['status_distribution'].items():
print(f" {status}: {count} 个")
2. HBase与流处理系统集成
2.1 与Kafka集成
class KafkaMessageType(Enum):
"""Kafka消息类型枚举"""
PUT = "put"
DELETE = "delete"
INCREMENT = "increment"
BATCH = "batch"
SCHEMA_CHANGE = "schema_change"
class ConsumerStatus(Enum):
"""消费者状态枚举"""
STOPPED = "stopped"
RUNNING = "running"
PAUSED = "paused"
ERROR = "error"
@dataclass
class KafkaMessage:
"""Kafka消息数据类"""
topic: str
partition: int
offset: int
key: str
value: Dict[str, Any]
message_type: KafkaMessageType
timestamp: datetime = field(default_factory=datetime.now)
headers: Dict[str, str] = field(default_factory=dict)
@dataclass
class KafkaConsumerConfig:
"""Kafka消费者配置数据类"""
consumer_id: str
group_id: str
topics: List[str]
bootstrap_servers: List[str]
auto_offset_reset: str = "latest" # earliest, latest
enable_auto_commit: bool = True
max_poll_records: int = 500
session_timeout_ms: int = 30000
heartbeat_interval_ms: int = 3000
batch_size: int = 100
flush_interval_ms: int = 5000
class HBaseKafkaIntegration:
"""HBase与Kafka集成管理器"""
def __init__(self):
self.consumers: Dict[str, Dict[str, Any]] = {}
self.producers: Dict[str, Dict[str, Any]] = {}
self.message_queue: Dict[str, List[KafkaMessage]] = {}
self.consumer_stats: Dict[str, Dict[str, Any]] = {}
self.producer_stats: Dict[str, Dict[str, Any]] = {}
self._lock = threading.Lock()
# 模拟Kafka主题
self._initialize_topics()
def _initialize_topics(self):
"""初始化Kafka主题"""
topics = [
"hbase-puts",
"hbase-deletes",
"hbase-increments",
"hbase-batch-operations",
"hbase-schema-changes",
"hbase-cdc" # Change Data Capture
]
for topic in topics:
self.message_queue[topic] = []
print(f"Kafka主题初始化完成: {topics}")
def create_producer(self, producer_id: str, bootstrap_servers: List[str]) -> str:
"""创建Kafka生产者"""
producer_config = {
"producer_id": producer_id,
"bootstrap_servers": bootstrap_servers,
"acks": "all",
"retries": 3,
"batch_size": 16384,
"linger_ms": 1,
"buffer_memory": 33554432,
"compression_type": "snappy"
}
with self._lock:
self.producers[producer_id] = {
"config": producer_config,
"status": "active",
"created_time": datetime.now(),
"messages_sent": 0,
"bytes_sent": 0,
"errors": 0
}
self.producer_stats[producer_id] = {
"messages_per_second": 0,
"bytes_per_second": 0,
"error_rate": 0,
"last_send_time": None
}
print(f"Kafka生产者已创建: {producer_id}")
return producer_id
def send_message(self, producer_id: str, topic: str, key: str,
message_type: KafkaMessageType, data: Dict[str, Any]) -> bool:
"""发送消息到Kafka"""
if producer_id not in self.producers:
print(f"生产者不存在: {producer_id}")
return False
if topic not in self.message_queue:
print(f"主题不存在: {topic}")
return False
try:
# 创建消息
message = KafkaMessage(
topic=topic,
partition=random.randint(0, 2), # 模拟3个分区
offset=len(self.message_queue[topic]),
key=key,
value=data,
message_type=message_type,
headers={
"producer_id": producer_id,
"content_type": "application/json"
}
)
# 添加到消息队列
with self._lock:
self.message_queue[topic].append(message)
# 更新生产者统计
producer_info = self.producers[producer_id]
producer_info["messages_sent"] += 1
producer_info["bytes_sent"] += len(json.dumps(data).encode())
stats = self.producer_stats[producer_id]
stats["last_send_time"] = datetime.now()
print(f"消息已发送: {topic} - {key} ({message_type.value})")
return True
except Exception as e:
with self._lock:
self.producers[producer_id]["errors"] += 1
print(f"消息发送失败: {e}")
return False
def create_consumer(self, config: KafkaConsumerConfig) -> str:
"""创建Kafka消费者"""
consumer_info = {
"config": config,
"status": ConsumerStatus.STOPPED,
"created_time": datetime.now(),
"last_poll_time": None,
"messages_consumed": 0,
"bytes_consumed": 0,
"errors": 0,
"current_offsets": {topic: 0 for topic in config.topics},
"committed_offsets": {topic: 0 for topic in config.topics}
}
with self._lock:
self.consumers[config.consumer_id] = consumer_info
self.consumer_stats[config.consumer_id] = {
"messages_per_second": 0,
"bytes_per_second": 0,
"lag": 0,
"error_rate": 0
}
print(f"Kafka消费者已创建: {config.consumer_id}")
return config.consumer_id
def start_consumer(self, consumer_id: str, message_handler) -> bool:
"""启动Kafka消费者"""
if consumer_id not in self.consumers:
print(f"消费者不存在: {consumer_id}")
return False
consumer_info = self.consumers[consumer_id]
if consumer_info["status"] == ConsumerStatus.RUNNING:
print(f"消费者已在运行: {consumer_id}")
return True
consumer_info["status"] = ConsumerStatus.RUNNING
# 启动消费者线程
consumer_thread = threading.Thread(
target=self._consume_messages,
args=(consumer_id, message_handler)
)
consumer_thread.daemon = True
consumer_thread.start()
print(f"Kafka消费者已启动: {consumer_id}")
return True
def _consume_messages(self, consumer_id: str, message_handler):
"""消费消息"""
consumer_info = self.consumers[consumer_id]
config = consumer_info["config"]
while consumer_info["status"] == ConsumerStatus.RUNNING:
try:
# 模拟轮询消息
messages_batch = []
for topic in config.topics:
if topic in self.message_queue:
current_offset = consumer_info["current_offsets"][topic]
available_messages = self.message_queue[topic][current_offset:]
# 限制批次大小
batch_messages = available_messages[:config.max_poll_records]
messages_batch.extend(batch_messages)
# 更新偏移量
consumer_info["current_offsets"][topic] += len(batch_messages)
if messages_batch:
# 处理消息批次
for message in messages_batch:
try:
message_handler(message)
with self._lock:
consumer_info["messages_consumed"] += 1
consumer_info["bytes_consumed"] += len(
json.dumps(message.value).encode()
)
except Exception as e:
print(f"消息处理失败: {e}")
with self._lock:
consumer_info["errors"] += 1
# 提交偏移量
if config.enable_auto_commit:
for topic in config.topics:
consumer_info["committed_offsets"][topic] = (
consumer_info["current_offsets"][topic]
)
consumer_info["last_poll_time"] = datetime.now()
time.sleep(config.flush_interval_ms / 1000.0)
except Exception as e:
print(f"消费者错误: {consumer_id} - {e}")
consumer_info["status"] = ConsumerStatus.ERROR
with self._lock:
consumer_info["errors"] += 1
break
def stop_consumer(self, consumer_id: str) -> bool:
"""停止Kafka消费者"""
if consumer_id not in self.consumers:
return False
consumer_info = self.consumers[consumer_id]
consumer_info["status"] = ConsumerStatus.STOPPED
print(f"Kafka消费者已停止: {consumer_id}")
return True
def get_consumer_lag(self, consumer_id: str) -> Dict[str, int]:
"""获取消费者延迟"""
if consumer_id not in self.consumers:
return {}
consumer_info = self.consumers[consumer_id]
lag_info = {}
for topic in consumer_info["config"].topics:
if topic in self.message_queue:
latest_offset = len(self.message_queue[topic])
committed_offset = consumer_info["committed_offsets"][topic]
lag_info[topic] = latest_offset - committed_offset
return lag_info
def get_integration_statistics(self) -> Dict[str, Any]:
"""获取集成统计信息"""
stats = {
"producers": {
"total_count": len(self.producers),
"total_messages_sent": 0,
"total_bytes_sent": 0,
"total_errors": 0
},
"consumers": {
"total_count": len(self.consumers),
"running_count": 0,
"total_messages_consumed": 0,
"total_bytes_consumed": 0,
"total_errors": 0,
"total_lag": 0
},
"topics": {
"total_count": len(self.message_queue),
"message_distribution": {}
}
}
# 生产者统计
for producer_info in self.producers.values():
stats["producers"]["total_messages_sent"] += producer_info["messages_sent"]
stats["producers"]["total_bytes_sent"] += producer_info["bytes_sent"]
stats["producers"]["total_errors"] += producer_info["errors"]
# 消费者统计
for consumer_info in self.consumers.values():
if consumer_info["status"] == ConsumerStatus.RUNNING:
stats["consumers"]["running_count"] += 1
stats["consumers"]["total_messages_consumed"] += consumer_info["messages_consumed"]
stats["consumers"]["total_bytes_consumed"] += consumer_info["bytes_consumed"]
stats["consumers"]["total_errors"] += consumer_info["errors"]
# 主题统计
for topic, messages in self.message_queue.items():
stats["topics"]["message_distribution"][topic] = len(messages)
# 计算总延迟
for consumer_id in self.consumers.keys():
lag_info = self.get_consumer_lag(consumer_id)
stats["consumers"]["total_lag"] += sum(lag_info.values())
return stats
class HBaseChangeDataCapture:
"""HBase变更数据捕获"""
def __init__(self, kafka_integration: HBaseKafkaIntegration):
self.kafka_integration = kafka_integration
self.cdc_enabled = False
self.monitored_tables: Set[str] = set()
self.change_log: List[Dict[str, Any]] = []
self._lock = threading.Lock()
def enable_cdc(self, table_name: str, producer_id: str) -> bool:
"""启用表的CDC"""
with self._lock:
self.monitored_tables.add(table_name)
print(f"CDC已启用: {table_name}")
return True
def capture_put_operation(self, table_name: str, row_key: str,
column_family: str, qualifier: str,
value: Any, producer_id: str) -> bool:
"""捕获PUT操作"""
if table_name not in self.monitored_tables:
return False
change_data = {
"operation": "PUT",
"table": table_name,
"row_key": row_key,
"column_family": column_family,
"qualifier": qualifier,
"value": value,
"timestamp": datetime.now().isoformat()
}
# 发送到Kafka
success = self.kafka_integration.send_message(
producer_id=producer_id,
topic="hbase-cdc",
key=f"{table_name}:{row_key}",
message_type=KafkaMessageType.PUT,
data=change_data
)
if success:
with self._lock:
self.change_log.append(change_data)
# 保留最近1000条记录
if len(self.change_log) > 1000:
self.change_log = self.change_log[-1000:]
return success
def capture_delete_operation(self, table_name: str, row_key: str,
producer_id: str) -> bool:
"""捕获DELETE操作"""
if table_name not in self.monitored_tables:
return False
change_data = {
"operation": "DELETE",
"table": table_name,
"row_key": row_key,
"timestamp": datetime.now().isoformat()
}
# 发送到Kafka
success = self.kafka_integration.send_message(
producer_id=producer_id,
topic="hbase-cdc",
key=f"{table_name}:{row_key}",
message_type=KafkaMessageType.DELETE,
data=change_data
)
if success:
with self._lock:
self.change_log.append(change_data)
if len(self.change_log) > 1000:
self.change_log = self.change_log[-1000:]
return success
# HBase与Kafka集成示例
print("\n=== HBase与Kafka集成示例 ===")
# 创建Kafka集成管理器
kafka_integration = HBaseKafkaIntegration()
print("1. 创建生产者:")
producer_id = kafka_integration.create_producer(
producer_id="hbase-producer-001",
bootstrap_servers=["localhost:9092"]
)
print("\n2. 发送HBase操作消息:")
# 发送PUT操作消息
kafka_integration.send_message(
producer_id=producer_id,
topic="hbase-puts",
key="user001",
message_type=KafkaMessageType.PUT,
data={
"table": "user_table",
"row_key": "user001",
"column_family": "cf1",
"qualifier": "name",
"value": "Alice"
}
)
# 发送DELETE操作消息
kafka_integration.send_message(
producer_id=producer_id,
topic="hbase-deletes",
key="user002",
message_type=KafkaMessageType.DELETE,
data={
"table": "user_table",
"row_key": "user002"
}
)
print("\n3. 创建消费者:")
consumer_config = KafkaConsumerConfig(
consumer_id="hbase-consumer-001",
group_id="hbase-processors",
topics=["hbase-puts", "hbase-deletes"],
bootstrap_servers=["localhost:9092"],
batch_size=50
)
consumer_id = kafka_integration.create_consumer(consumer_config)
# 定义消息处理器
def process_hbase_message(message: KafkaMessage):
"""处理HBase消息"""
print(f" 处理消息: {message.topic} - {message.key}")
print(f" 类型: {message.message_type.value}")
print(f" 数据: {message.value}")
# 模拟处理延迟
time.sleep(0.01)
print("\n4. 启动消费者:")
kafka_integration.start_consumer(consumer_id, process_hbase_message)
# 等待消息处理
time.sleep(2)
print("\n5. 检查消费者延迟:")
lag_info = kafka_integration.get_consumer_lag(consumer_id)
for topic, lag in lag_info.items():
print(f" {topic}: {lag} 条消息延迟")
print("\n6. 启用CDC:")
cdc = HBaseChangeDataCapture(kafka_integration)
cdc.enable_cdc("user_table", producer_id)
# 模拟CDC捕获操作
cdc.capture_put_operation(
table_name="user_table",
row_key="user003",
column_family="cf1",
qualifier="email",
value="user003@example.com",
producer_id=producer_id
)
cdc.capture_delete_operation(
table_name="user_table",
row_key="user004",
producer_id=producer_id
)
time.sleep(1)
print("\n7. 集成统计:")
integration_stats = kafka_integration.get_integration_statistics()
print(f" 生产者数量: {integration_stats['producers']['total_count']}")
print(f" 发送消息总数: {integration_stats['producers']['total_messages_sent']}")
print(f" 消费者数量: {integration_stats['consumers']['total_count']}")
print(f" 运行中消费者: {integration_stats['consumers']['running_count']}")
print(f" 消费消息总数: {integration_stats['consumers']['total_messages_consumed']}")
print(f" 总延迟: {integration_stats['consumers']['total_lag']} 条消息")
print("\n 主题消息分布:")
for topic, count in integration_stats['topics']['message_distribution'].items():
print(f" {topic}: {count} 条消息")
# 停止消费者
kafka_integration.stop_consumer(consumer_id)
2.2 与Spark集成
class SparkJobType(Enum):
"""Spark作业类型枚举"""
BATCH_READ = "batch_read"
BATCH_WRITE = "batch_write"
STREAMING_READ = "streaming_read"
STREAMING_WRITE = "streaming_write"
ML_TRAINING = "ml_training"
ML_INFERENCE = "ml_inference"
ETL = "etl"
class SparkJobStatus(Enum):
"""Spark作业状态枚举"""
SUBMITTED = "submitted"
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class SparkJob:
"""Spark作业数据类"""
job_id: str
job_name: str
job_type: SparkJobType
application_id: str
status: SparkJobStatus = SparkJobStatus.SUBMITTED
submit_time: datetime = field(default_factory=datetime.now)
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
driver_memory: str = "2g"
executor_memory: str = "2g"
executor_cores: int = 2
num_executors: int = 4
input_path: Optional[str] = None
output_path: Optional[str] = None
hbase_table: Optional[str] = None
records_processed: int = 0
error_message: Optional[str] = None
configuration: Dict[str, str] = field(default_factory=dict)
class HBaseSparkIntegration:
"""HBase与Spark集成管理器"""
def __init__(self):
self.spark_jobs: Dict[str, SparkJob] = {}
self.job_history: List[Dict[str, Any]] = []
self.running_jobs: Set[str] = set()
self.spark_context_active = False
self._lock = threading.Lock()
def initialize_spark_context(self, app_name: str = "HBase-Spark-Integration") -> bool:
"""初始化Spark上下文"""
if self.spark_context_active:
print("Spark上下文已激活")
return True
# 模拟Spark上下文初始化
print(f"正在初始化Spark上下文: {app_name}")
time.sleep(1) # 模拟初始化时间
self.spark_context_active = True
print("Spark上下文初始化完成")
return True
def submit_batch_read_job(self, table_name: str, output_path: str,
start_row: str = None, end_row: str = None) -> str:
"""提交批量读取作业"""
job_id = f"spark_read_{int(time.time())}_{random.randint(1000, 9999)}"
app_id = f"application_{int(time.time())}"
job = SparkJob(
job_id=job_id,
job_name=f"BatchRead-{table_name}",
job_type=SparkJobType.BATCH_READ,
application_id=app_id,
hbase_table=table_name,
output_path=output_path,
configuration={
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.hbase.table.name": table_name,
"spark.hbase.start.row": start_row or "",
"spark.hbase.end.row": end_row or ""
}
)
with self._lock:
self.spark_jobs[job_id] = job
# 启动作业执行线程
job_thread = threading.Thread(target=self._execute_batch_read_job, args=(job_id,))
job_thread.daemon = True
job_thread.start()
print(f"Spark批量读取作业已提交: {job_id}")
return job_id
def _execute_batch_read_job(self, job_id: str):
"""执行批量读取作业"""
job = self.spark_jobs[job_id]
try:
with self._lock:
self.running_jobs.add(job_id)
job.status = SparkJobStatus.RUNNING
job.start_time = datetime.now()
print(f"开始执行Spark批量读取: {job_id}")
# 模拟Spark作业执行阶段
stages = ["扫描HBase表", "数据转换", "写入输出"]
for i, stage in enumerate(stages):
print(f" 阶段 {i+1}: {stage}")
time.sleep(0.5) # 模拟每个阶段的执行时间
# 模拟读取的记录数
records_read = random.randint(50000, 200000)
job.records_processed = records_read
job.status = SparkJobStatus.SUCCEEDED
job.end_time = datetime.now()
print(f"Spark批量读取完成: {job_id} (读取 {records_read:,} 条记录)")
except Exception as e:
job.status = SparkJobStatus.FAILED
job.error_message = str(e)
job.end_time = datetime.now()
print(f"Spark批量读取失败: {job_id} - {e}")
finally:
with self._lock:
self.running_jobs.discard(job_id)
self._record_job_history(job)
def submit_batch_write_job(self, input_path: str, table_name: str,
column_family: str) -> str:
"""提交批量写入作业"""
job_id = f"spark_write_{int(time.time())}_{random.randint(1000, 9999)}"
app_id = f"application_{int(time.time())}"
job = SparkJob(
job_id=job_id,
job_name=f"BatchWrite-{table_name}",
job_type=SparkJobType.BATCH_WRITE,
application_id=app_id,
input_path=input_path,
hbase_table=table_name,
configuration={
"spark.sql.adaptive.enabled": "true",
"spark.hbase.table.name": table_name,
"spark.hbase.column.family": column_family,
"spark.hbase.write.batch.size": "1000"
}
)
with self._lock:
self.spark_jobs[job_id] = job
# 启动作业执行线程
job_thread = threading.Thread(target=self._execute_batch_write_job, args=(job_id,))
job_thread.daemon = True
job_thread.start()
print(f"Spark批量写入作业已提交: {job_id}")
return job_id
def _execute_batch_write_job(self, job_id: str):
"""执行批量写入作业"""
job = self.spark_jobs[job_id]
try:
with self._lock:
self.running_jobs.add(job_id)
job.status = SparkJobStatus.RUNNING
job.start_time = datetime.now()
print(f"开始执行Spark批量写入: {job_id}")
# 模拟Spark作业执行阶段
stages = ["读取输入数据", "数据预处理", "批量写入HBase"]
for i, stage in enumerate(stages):
print(f" 阶段 {i+1}: {stage}")
time.sleep(0.5)
# 模拟写入的记录数
records_written = random.randint(30000, 150000)
job.records_processed = records_written
job.status = SparkJobStatus.SUCCEEDED
job.end_time = datetime.now()
print(f"Spark批量写入完成: {job_id} (写入 {records_written:,} 条记录)")
except Exception as e:
job.status = SparkJobStatus.FAILED
job.error_message = str(e)
job.end_time = datetime.now()
print(f"Spark批量写入失败: {job_id} - {e}")
finally:
with self._lock:
self.running_jobs.discard(job_id)
self._record_job_history(job)
def submit_streaming_job(self, kafka_topics: List[str], table_name: str,
checkpoint_location: str) -> str:
"""提交流处理作业"""
job_id = f"spark_stream_{int(time.time())}_{random.randint(1000, 9999)}"
app_id = f"application_{int(time.time())}"
job = SparkJob(
job_id=job_id,
job_name=f"Streaming-{table_name}",
job_type=SparkJobType.STREAMING_WRITE,
application_id=app_id,
hbase_table=table_name,
configuration={
"spark.sql.streaming.checkpointLocation": checkpoint_location,
"spark.sql.streaming.stateStore.providerClass": "org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider",
"kafka.topics": ",".join(kafka_topics),
"kafka.bootstrap.servers": "localhost:9092",
"hbase.table.name": table_name
}
)
with self._lock:
self.spark_jobs[job_id] = job
# 启动流处理作业线程
job_thread = threading.Thread(target=self._execute_streaming_job, args=(job_id,))
job_thread.daemon = True
job_thread.start()
print(f"Spark流处理作业已提交: {job_id}")
return job_id
def _execute_streaming_job(self, job_id: str):
"""执行流处理作业"""
job = self.spark_jobs[job_id]
try:
with self._lock:
self.running_jobs.add(job_id)
job.status = SparkJobStatus.RUNNING
job.start_time = datetime.now()
print(f"开始执行Spark流处理: {job_id}")
# 模拟流处理(持续运行)
batch_count = 0
total_records = 0
while job.status == SparkJobStatus.RUNNING and batch_count < 10: # 限制批次数用于演示
batch_count += 1
batch_records = random.randint(100, 1000)
total_records += batch_records
print(f" 批次 {batch_count}: 处理 {batch_records} 条记录")
time.sleep(1) # 模拟批次间隔
job.records_processed = total_records
job.status = SparkJobStatus.SUCCEEDED
job.end_time = datetime.now()
print(f"Spark流处理完成: {job_id} (总计处理 {total_records:,} 条记录)")
except Exception as e:
job.status = SparkJobStatus.FAILED
job.error_message = str(e)
job.end_time = datetime.now()
print(f"Spark流处理失败: {job_id} - {e}")
finally:
with self._lock:
self.running_jobs.discard(job_id)
self._record_job_history(job)
def submit_ml_training_job(self, training_table: str, model_output_path: str,
algorithm: str = "logistic_regression") -> str:
"""提交机器学习训练作业"""
job_id = f"spark_ml_{int(time.time())}_{random.randint(1000, 9999)}"
app_id = f"application_{int(time.time())}"
job = SparkJob(
job_id=job_id,
job_name=f"MLTraining-{algorithm}",
job_type=SparkJobType.ML_TRAINING,
application_id=app_id,
hbase_table=training_table,
output_path=model_output_path,
executor_memory="4g",
num_executors=8,
configuration={
"spark.ml.algorithm": algorithm,
"spark.ml.feature.columns": "cf1:feature1,cf1:feature2,cf1:feature3",
"spark.ml.label.column": "cf1:label",
"spark.ml.model.output": model_output_path
}
)
with self._lock:
self.spark_jobs[job_id] = job
# 启动ML训练作业线程
job_thread = threading.Thread(target=self._execute_ml_training_job, args=(job_id,))
job_thread.daemon = True
job_thread.start()
print(f"Spark ML训练作业已提交: {job_id}")
return job_id
def _execute_ml_training_job(self, job_id: str):
"""执行ML训练作业"""
job = self.spark_jobs[job_id]
try:
with self._lock:
self.running_jobs.add(job_id)
job.status = SparkJobStatus.RUNNING
job.start_time = datetime.now()
print(f"开始执行Spark ML训练: {job_id}")
# 模拟ML训练阶段
stages = ["数据加载", "特征工程", "模型训练", "模型评估", "模型保存"]
for i, stage in enumerate(stages):
print(f" 阶段 {i+1}: {stage}")
time.sleep(0.8) # ML训练通常需要更长时间
# 模拟训练数据量
training_records = random.randint(100000, 500000)
job.records_processed = training_records
# 模拟模型评估结果
accuracy = random.uniform(0.85, 0.95)
print(f" 模型准确率: {accuracy:.3f}")
job.status = SparkJobStatus.SUCCEEDED
job.end_time = datetime.now()
print(f"Spark ML训练完成: {job_id} (训练 {training_records:,} 条记录)")
except Exception as e:
job.status = SparkJobStatus.FAILED
job.error_message = str(e)
job.end_time = datetime.now()
print(f"Spark ML训练失败: {job_id} - {e}")
finally:
with self._lock:
self.running_jobs.discard(job_id)
self._record_job_history(job)
def cancel_job(self, job_id: str) -> bool:
"""取消作业"""
if job_id not in self.spark_jobs:
return False
job = self.spark_jobs[job_id]
if job.status == SparkJobStatus.RUNNING:
job.status = SparkJobStatus.CANCELLED
job.end_time = datetime.now()
with self._lock:
self.running_jobs.discard(job_id)
self._record_job_history(job)
print(f"Spark作业已取消: {job_id}")
return True
return False
def get_job_status(self, job_id: str) -> Optional[Dict[str, Any]]:
"""获取作业状态"""
job = self.spark_jobs.get(job_id)
if not job:
return None
duration = 0
if job.start_time:
end_time = job.end_time or datetime.now()
duration = (end_time - job.start_time).total_seconds()
return {
"job_id": job.job_id,
"job_name": job.job_name,
"job_type": job.job_type.value,
"application_id": job.application_id,
"status": job.status.value,
"submit_time": job.submit_time.isoformat(),
"start_time": job.start_time.isoformat() if job.start_time else None,
"end_time": job.end_time.isoformat() if job.end_time else None,
"duration_seconds": duration,
"records_processed": job.records_processed,
"driver_memory": job.driver_memory,
"executor_memory": job.executor_memory,
"num_executors": job.num_executors,
"error_message": job.error_message
}
def _record_job_history(self, job: SparkJob):
"""记录作业历史"""
duration = 0
if job.start_time and job.end_time:
duration = (job.end_time - job.start_time).total_seconds()
self.job_history.append({
"job_id": job.job_id,
"job_name": job.job_name,
"job_type": job.job_type.value,
"status": job.status.value,
"duration_seconds": duration,
"records_processed": job.records_processed,
"submit_time": job.submit_time.isoformat()
})
# 保留最近100条记录
if len(self.job_history) > 100:
self.job_history = self.job_history[-100:]
def get_spark_statistics(self) -> Dict[str, Any]:
"""获取Spark统计信息"""
if not self.job_history:
return {"total_jobs": 0}
stats = {
"total_jobs": len(self.job_history),
"running_jobs": len(self.running_jobs),
"spark_context_active": self.spark_context_active,
"job_type_distribution": {},
"status_distribution": {},
"performance_metrics": {
"average_duration": 0,
"total_records_processed": 0,
"success_rate": 0
}
}
total_duration = 0
total_records = 0
successful_jobs = 0
for job_record in self.job_history:
# 按作业类型统计
job_type = job_record["job_type"]
if job_type not in stats["job_type_distribution"]:
stats["job_type_distribution"][job_type] = 0
stats["job_type_distribution"][job_type] += 1
# 按状态统计
status = job_record["status"]
if status not in stats["status_distribution"]:
stats["status_distribution"][status] = 0
stats["status_distribution"][status] += 1
# 性能统计
total_duration += job_record["duration_seconds"]
total_records += job_record["records_processed"]
if status == "succeeded":
successful_jobs += 1
if len(self.job_history) > 0:
stats["performance_metrics"]["average_duration"] = (
total_duration / len(self.job_history)
)
stats["performance_metrics"]["total_records_processed"] = total_records
stats["performance_metrics"]["success_rate"] = (
successful_jobs / len(self.job_history)
)
return stats
# HBase与Spark集成示例
print("\n=== HBase与Spark集成示例 ===")
# 创建Spark集成管理器
spark_integration = HBaseSparkIntegration()
print("1. 初始化Spark上下文:")
spark_integration.initialize_spark_context("HBase-Analytics")
print("\n2. 提交批量读取作业:")
read_job_id = spark_integration.submit_batch_read_job(
table_name="user_table",
output_path="/data/spark/user_export",
start_row="user001",
end_row="user999"
)
print("\n3. 提交批量写入作业:")
write_job_id = spark_integration.submit_batch_write_job(
input_path="/data/input/new_users.parquet",
table_name="user_table",
column_family="cf1"
)
print("\n4. 提交流处理作业:")
stream_job_id = spark_integration.submit_streaming_job(
kafka_topics=["user-events", "user-actions"],
table_name="user_activity",
checkpoint_location="/data/checkpoints/user_stream"
)
print("\n5. 提交ML训练作业:")
ml_job_id = spark_integration.submit_ml_training_job(
training_table="user_features",
model_output_path="/models/user_prediction",
algorithm="random_forest"
)
# 等待作业完成
time.sleep(5)
print("\n6. 检查作业状态:")
for job_id in [read_job_id, write_job_id, stream_job_id, ml_job_id]:
status = spark_integration.get_job_status(job_id)
if status:
print(f" {status['job_name']}:")
print(f" 状态: {status['status']}")
print(f" 应用ID: {status['application_id']}")
print(f" 耗时: {status['duration_seconds']:.1f} 秒")
print(f" 处理记录: {status['records_processed']:,}")
print(f" 执行器数量: {status['num_executors']}")
print()
print("\n7. Spark统计:")
spark_stats = spark_integration.get_spark_statistics()
print(f" 总作业数: {spark_stats['total_jobs']}")
print(f" 运行中作业: {spark_stats['running_jobs']}")
print(f" Spark上下文状态: {'活跃' if spark_stats['spark_context_active'] else '未激活'}")
print(f" 成功率: {spark_stats['performance_metrics']['success_rate']:.1%}")
print(f" 平均耗时: {spark_stats['performance_metrics']['average_duration']:.1f} 秒")
print(f" 总处理记录: {spark_stats['performance_metrics']['total_records_processed']:,}")
print("\n 作业类型分布:")
for job_type, count in spark_stats['job_type_distribution'].items():
print(f" {job_type}: {count} 个")
2.3 与Elasticsearch集成
class ESOperationType(Enum):
"""Elasticsearch操作类型枚举"""
INDEX = "index"
BULK_INDEX = "bulk_index"
SEARCH = "search"
UPDATE = "update"
DELETE = "delete"
SYNC = "sync"
REINDEX = "reindex"
class ESSyncStatus(Enum):
"""ES同步状态枚举"""
IDLE = "idle"
SYNCING = "syncing"
COMPLETED = "completed"
FAILED = "failed"
PAUSED = "paused"
@dataclass
class ESDocument:
"""Elasticsearch文档数据类"""
doc_id: str
index_name: str
doc_type: str = "_doc"
source: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
version: int = 1
routing: Optional[str] = None
@dataclass
class ESSyncJob:
"""ES同步作业数据类"""
job_id: str
job_name: str
operation_type: ESOperationType
hbase_table: str
es_index: str
status: ESSyncStatus = ESSyncStatus.IDLE
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
documents_processed: int = 0
documents_failed: int = 0
batch_size: int = 1000
error_message: Optional[str] = None
mapping_config: Dict[str, str] = field(default_factory=dict)
class HBaseElasticsearchIntegration:
"""HBase与Elasticsearch集成管理器"""
def __init__(self, es_hosts: List[str] = None):
self.es_hosts = es_hosts or ["localhost:9200"]
self.sync_jobs: Dict[str, ESSyncJob] = {}
self.documents: Dict[str, List[ESDocument]] = {} # 按索引存储文档
self.sync_history: List[Dict[str, Any]] = []
self.running_jobs: Set[str] = set()
self.es_connected = False
self._lock = threading.Lock()
def connect_elasticsearch(self) -> bool:
"""连接Elasticsearch"""
if self.es_connected:
print("Elasticsearch已连接")
return True
print(f"正在连接Elasticsearch: {', '.join(self.es_hosts)}")
time.sleep(0.5) # 模拟连接时间
self.es_connected = True
print("Elasticsearch连接成功")
return True
def create_index_mapping(self, index_name: str, mapping: Dict[str, Any]) -> bool:
"""创建索引映射"""
if not self.es_connected:
self.connect_elasticsearch()
print(f"创建索引映射: {index_name}")
# 模拟创建索引映射
if index_name not in self.documents:
self.documents[index_name] = []
print(f"索引映射创建完成: {index_name}")
return True
def submit_sync_job(self, hbase_table: str, es_index: str,
operation_type: ESOperationType = ESOperationType.SYNC,
mapping_config: Dict[str, str] = None) -> str:
"""提交同步作业"""
job_id = f"es_sync_{int(time.time())}_{random.randint(1000, 9999)}"
job = ESSyncJob(
job_id=job_id,
job_name=f"Sync-{hbase_table}-to-{es_index}",
operation_type=operation_type,
hbase_table=hbase_table,
es_index=es_index,
mapping_config=mapping_config or {
"rowkey": "id",
"cf1:name": "name",
"cf1:email": "email",
"cf1:age": "age",
"cf1:city": "city"
}
)
with self._lock:
self.sync_jobs[job_id] = job
# 启动同步作业线程
sync_thread = threading.Thread(target=self._execute_sync_job, args=(job_id,))
sync_thread.daemon = True
sync_thread.start()
print(f"ES同步作业已提交: {job_id}")
return job_id
def _execute_sync_job(self, job_id: str):
"""执行同步作业"""
job = self.sync_jobs[job_id]
try:
with self._lock:
self.running_jobs.add(job_id)
job.status = ESSyncStatus.SYNCING
job.start_time = datetime.now()
print(f"开始执行ES同步: {job_id}")
# 确保ES索引存在
if job.es_index not in self.documents:
self.documents[job.es_index] = []
# 模拟从HBase读取数据并同步到ES
total_batches = random.randint(5, 15)
for batch_num in range(1, total_batches + 1):
print(f" 批次 {batch_num}/{total_batches}: 同步数据")
# 模拟批量处理
batch_docs = []
batch_size = random.randint(500, 1500)
for i in range(batch_size):
doc_id = f"{job.hbase_table}_doc_{job.documents_processed + i + 1}"
# 模拟HBase数据转换为ES文档
doc = ESDocument(
doc_id=doc_id,
index_name=job.es_index,
source={
"id": doc_id,
"name": f"User {job.documents_processed + i + 1}",
"email": f"user{job.documents_processed + i + 1}@example.com",
"age": random.randint(18, 65),
"city": random.choice(["北京", "上海", "广州", "深圳", "杭州"]),
"sync_time": datetime.now().isoformat(),
"source_table": job.hbase_table
}
)
batch_docs.append(doc)
# 模拟批量索引到ES
self.documents[job.es_index].extend(batch_docs)
job.documents_processed += len(batch_docs)
# 模拟一些失败的文档
if random.random() < 0.1: # 10%的批次可能有失败
failed_count = random.randint(1, 10)
job.documents_failed += failed_count
print(f" 批次 {batch_num}: {failed_count} 个文档索引失败")
time.sleep(0.3) # 模拟批次处理时间
job.status = ESSyncStatus.COMPLETED
job.end_time = datetime.now()
print(f"ES同步完成: {job_id} (处理 {job.documents_processed:,} 个文档, 失败 {job.documents_failed} 个)")
except Exception as e:
job.status = ESSyncStatus.FAILED
job.error_message = str(e)
job.end_time = datetime.now()
print(f"ES同步失败: {job_id} - {e}")
finally:
with self._lock:
self.running_jobs.discard(job_id)
self._record_sync_history(job)
def search_documents(self, index_name: str, query: Dict[str, Any],
size: int = 10) -> List[Dict[str, Any]]:
"""搜索文档"""
if index_name not in self.documents:
return []
docs = self.documents[index_name]
results = []
# 简单的查询模拟
for doc in docs[:size]: # 限制返回数量
# 模拟查询匹配
if self._match_query(doc.source, query):
results.append({
"_id": doc.doc_id,
"_index": doc.index_name,
"_source": doc.source,
"_score": random.uniform(0.5, 1.0)
})
print(f"搜索完成: 在索引 {index_name} 中找到 {len(results)} 个匹配文档")
return results
def _match_query(self, doc_source: Dict[str, Any], query: Dict[str, Any]) -> bool:
"""简单的查询匹配逻辑"""
# 模拟简单的匹配逻辑
if "match" in query:
for field, value in query["match"].items():
if field in doc_source:
if str(value).lower() in str(doc_source[field]).lower():
return True
if "term" in query:
for field, value in query["term"].items():
if field in doc_source and doc_source[field] == value:
return True
if "range" in query:
for field, range_query in query["range"].items():
if field in doc_source:
doc_value = doc_source[field]
if isinstance(doc_value, (int, float)):
if "gte" in range_query and doc_value >= range_query["gte"]:
return True
if "lte" in range_query and doc_value <= range_query["lte"]:
return True
return False
def bulk_index_documents(self, index_name: str, documents: List[Dict[str, Any]]) -> Dict[str, int]:
"""批量索引文档"""
if not self.es_connected:
self.connect_elasticsearch()
if index_name not in self.documents:
self.documents[index_name] = []
successful = 0
failed = 0
for doc_data in documents:
try:
doc_id = doc_data.get("id", f"doc_{int(time.time())}_{random.randint(1000, 9999)}")
doc = ESDocument(
doc_id=doc_id,
index_name=index_name,
source=doc_data
)
self.documents[index_name].append(doc)
successful += 1
except Exception as e:
failed += 1
print(f"文档索引失败: {e}")
result = {"successful": successful, "failed": failed}
print(f"批量索引完成: 成功 {successful} 个, 失败 {failed} 个")
return result
def pause_sync_job(self, job_id: str) -> bool:
"""暂停同步作业"""
if job_id not in self.sync_jobs:
return False
job = self.sync_jobs[job_id]
if job.status == ESSyncStatus.SYNCING:
job.status = ESSyncStatus.PAUSED
print(f"同步作业已暂停: {job_id}")
return True
return False
def resume_sync_job(self, job_id: str) -> bool:
"""恢复同步作业"""
if job_id not in self.sync_jobs:
return False
job = self.sync_jobs[job_id]
if job.status == ESSyncStatus.PAUSED:
job.status = ESSyncStatus.SYNCING
print(f"同步作业已恢复: {job_id}")
return True
return False
def get_sync_status(self, job_id: str) -> Optional[Dict[str, Any]]:
"""获取同步状态"""
job = self.sync_jobs.get(job_id)
if not job:
return None
duration = 0
if job.start_time:
end_time = job.end_time or datetime.now()
duration = (end_time - job.start_time).total_seconds()
return {
"job_id": job.job_id,
"job_name": job.job_name,
"operation_type": job.operation_type.value,
"hbase_table": job.hbase_table,
"es_index": job.es_index,
"status": job.status.value,
"start_time": job.start_time.isoformat() if job.start_time else None,
"end_time": job.end_time.isoformat() if job.end_time else None,
"duration_seconds": duration,
"documents_processed": job.documents_processed,
"documents_failed": job.documents_failed,
"batch_size": job.batch_size,
"error_message": job.error_message
}
def _record_sync_history(self, job: ESSyncJob):
"""记录同步历史"""
duration = 0
if job.start_time and job.end_time:
duration = (job.end_time - job.start_time).total_seconds()
self.sync_history.append({
"job_id": job.job_id,
"job_name": job.job_name,
"operation_type": job.operation_type.value,
"status": job.status.value,
"duration_seconds": duration,
"documents_processed": job.documents_processed,
"documents_failed": job.documents_failed,
"start_time": job.start_time.isoformat() if job.start_time else None
})
# 保留最近50条记录
if len(self.sync_history) > 50:
self.sync_history = self.sync_history[-50:]
def get_index_statistics(self, index_name: str) -> Dict[str, Any]:
"""获取索引统计信息"""
if index_name not in self.documents:
return {"document_count": 0}
docs = self.documents[index_name]
# 统计文档类型分布
type_distribution = {}
for doc in docs:
doc_type = doc.doc_type
if doc_type not in type_distribution:
type_distribution[doc_type] = 0
type_distribution[doc_type] += 1
# 计算索引大小(模拟)
estimated_size_mb = len(docs) * 0.001 # 假设每个文档1KB
return {
"index_name": index_name,
"document_count": len(docs),
"type_distribution": type_distribution,
"estimated_size_mb": round(estimated_size_mb, 2),
"last_updated": datetime.now().isoformat()
}
def get_integration_statistics(self) -> Dict[str, Any]:
"""获取集成统计信息"""
if not self.sync_history:
return {"total_sync_jobs": 0}
stats = {
"total_sync_jobs": len(self.sync_history),
"running_jobs": len(self.running_jobs),
"es_connected": self.es_connected,
"es_hosts": self.es_hosts,
"operation_type_distribution": {},
"status_distribution": {},
"performance_metrics": {
"average_duration": 0,
"total_documents_processed": 0,
"total_documents_failed": 0,
"success_rate": 0
},
"index_statistics": {}
}
total_duration = 0
total_processed = 0
total_failed = 0
successful_jobs = 0
for job_record in self.sync_history:
# 按操作类型统计
op_type = job_record["operation_type"]
if op_type not in stats["operation_type_distribution"]:
stats["operation_type_distribution"][op_type] = 0
stats["operation_type_distribution"][op_type] += 1
# 按状态统计
status = job_record["status"]
if status not in stats["status_distribution"]:
stats["status_distribution"][status] = 0
stats["status_distribution"][status] += 1
# 性能统计
total_duration += job_record["duration_seconds"]
total_processed += job_record["documents_processed"]
total_failed += job_record["documents_failed"]
if status == "completed":
successful_jobs += 1
if len(self.sync_history) > 0:
stats["performance_metrics"]["average_duration"] = (
total_duration / len(self.sync_history)
)
stats["performance_metrics"]["total_documents_processed"] = total_processed
stats["performance_metrics"]["total_documents_failed"] = total_failed
stats["performance_metrics"]["success_rate"] = (
successful_jobs / len(self.sync_history)
)
# 索引统计
for index_name in self.documents:
stats["index_statistics"][index_name] = self.get_index_statistics(index_name)
return stats
# HBase与Elasticsearch集成示例
print("\n=== HBase与Elasticsearch集成示例 ===")
# 创建ES集成管理器
es_integration = HBaseElasticsearchIntegration(["localhost:9200", "localhost:9201"])
print("1. 连接Elasticsearch:")
es_integration.connect_elasticsearch()
print("\n2. 创建索引映射:")
user_mapping = {
"properties": {
"id": {"type": "keyword"},
"name": {"type": "text", "analyzer": "standard"},
"email": {"type": "keyword"},
"age": {"type": "integer"},
"city": {"type": "keyword"},
"sync_time": {"type": "date"}
}
}
es_integration.create_index_mapping("users", user_mapping)
print("\n3. 提交同步作业:")
sync_job_id = es_integration.submit_sync_job(
hbase_table="user_table",
es_index="users",
operation_type=ESOperationType.SYNC
)
print("\n4. 批量索引文档:")
bulk_docs = [
{"id": "manual_001", "name": "张三", "email": "zhangsan@example.com", "age": 28, "city": "北京"},
{"id": "manual_002", "name": "李四", "email": "lisi@example.com", "age": 32, "city": "上海"},
{"id": "manual_003", "name": "王五", "email": "wangwu@example.com", "age": 25, "city": "广州"}
]
bulk_result = es_integration.bulk_index_documents("users", bulk_docs)
print(f"批量索引结果: {bulk_result}")
# 等待同步完成
time.sleep(3)
print("\n5. 检查同步状态:")
sync_status = es_integration.get_sync_status(sync_job_id)
if sync_status:
print(f" 作业名称: {sync_status['job_name']}")
print(f" 状态: {sync_status['status']}")
print(f" HBase表: {sync_status['hbase_table']}")
print(f" ES索引: {sync_status['es_index']}")
print(f" 耗时: {sync_status['duration_seconds']:.1f} 秒")
print(f" 处理文档: {sync_status['documents_processed']:,}")
print(f" 失败文档: {sync_status['documents_failed']}")
print("\n6. 搜索文档:")
# 搜索年龄在25-35之间的用户
search_query = {
"range": {
"age": {"gte": 25, "lte": 35}
}
}
search_results = es_integration.search_documents("users", search_query, size=5)
print(f"搜索结果 (年龄25-35): {len(search_results)} 个文档")
for result in search_results[:3]: # 显示前3个结果
source = result["_source"]
print(f" - {source['name']} ({source['age']}岁, {source['city']})")
# 搜索特定城市的用户
city_query = {
"term": {
"city": "北京"
}
}
city_results = es_integration.search_documents("users", city_query, size=5)
print(f"\n搜索结果 (北京用户): {len(city_results)} 个文档")
print("\n7. 索引统计:")
index_stats = es_integration.get_index_statistics("users")
print(f" 索引名称: {index_stats['index_name']}")
print(f" 文档数量: {index_stats['document_count']:,}")
print(f" 估计大小: {index_stats['estimated_size_mb']} MB")
print(f" 文档类型分布: {index_stats['type_distribution']}")
print("\n8. 集成统计:")
integration_stats = es_integration.get_integration_statistics()
print(f" 总同步作业: {integration_stats['total_sync_jobs']}")
print(f" 运行中作业: {integration_stats['running_jobs']}")
print(f" ES连接状态: {'已连接' if integration_stats['es_connected'] else '未连接'}")
print(f" 成功率: {integration_stats['performance_metrics']['success_rate']:.1%}")
print(f" 总处理文档: {integration_stats['performance_metrics']['total_documents_processed']:,}")
print(f" 总失败文档: {integration_stats['performance_metrics']['total_documents_failed']}")
print("\n 操作类型分布:")
for op_type, count in integration_stats['operation_type_distribution'].items():
print(f" {op_type}: {count} 个")
3. 总结
3.1 关键要点
Hadoop生态系统集成
- HDFS作为HBase的底层存储系统
- MapReduce提供批量数据处理能力
- 支持数据导入导出和分析作业
Kafka集成
- 实时数据流处理
- 变更数据捕获(CDC)
- 高吞吐量数据传输
Spark集成
- 批量和流式数据处理
- 机器学习模型训练
- 高性能数据分析
Elasticsearch集成
- 全文搜索和复杂查询
- 实时数据同步
- 多维度数据分析
3.2 最佳实践
性能优化
- 合理设置批次大小
- 使用并行处理提高吞吐量
- 监控资源使用情况
数据一致性
- 实现幂等性操作
- 处理重复数据
- 确保事务完整性
错误处理
- 实现重试机制
- 记录详细的错误日志
- 提供故障恢复能力
监控和运维
- 实时监控集成状态
- 设置告警机制
- 定期检查数据质量
3.3 下一步学习
深入学习各系统
- 掌握Hadoop、Spark、Kafka、Elasticsearch的高级特性
- 了解各系统的性能调优方法
实际项目应用
- 在真实环境中部署集成方案
- 处理大规模数据集成场景
架构设计
- 设计复杂的数据处理流水线
- 实现微服务架构下的数据集成
新技术探索
- 关注云原生数据处理技术
- 学习实时计算和边缘计算
通过本章的学习,您已经掌握了HBase与主要大数据系统的集成方法。这些集成能力使HBase能够在复杂的大数据生态系统中发挥重要作用,为企业提供完整的数据处理解决方案。