1. HBase数据模型概述

1.1 数据模型基础

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any, Union
from datetime import datetime
import json
import hashlib

class DataType(Enum):
    """数据类型枚举"""
    STRING = "string"
    INTEGER = "integer"
    LONG = "long"
    FLOAT = "float"
    DOUBLE = "double"
    BOOLEAN = "boolean"
    BYTES = "bytes"
    JSON = "json"

class CompressionType(Enum):
    """压缩类型枚举"""
    NONE = "NONE"
    GZIP = "GZ"
    LZO = "LZO"
    SNAPPY = "SNAPPY"
    LZ4 = "LZ4"
    BZIP2 = "BZIP2"

class BloomFilterType(Enum):
    """布隆过滤器类型枚举"""
    NONE = "NONE"
    ROW = "ROW"
    ROWCOL = "ROWCOL"
    ROWPREFIX_FIXED_LENGTH = "ROWPREFIX_FIXED_LENGTH"

@dataclass
class Cell:
    """HBase单元格数据类"""
    row_key: str
    column_family: str
    column_qualifier: str
    value: Any
    timestamp: int
    data_type: DataType = DataType.STRING
    
    def __post_init__(self):
        if self.timestamp == 0:
            self.timestamp = int(datetime.now().timestamp() * 1000)
    
    def get_column_name(self) -> str:
        """获取完整列名"""
        return f"{self.column_family}:{self.column_qualifier}"
    
    def serialize_value(self) -> bytes:
        """序列化值"""
        if self.data_type == DataType.STRING:
            return str(self.value).encode('utf-8')
        elif self.data_type == DataType.INTEGER:
            return str(int(self.value)).encode('utf-8')
        elif self.data_type == DataType.LONG:
            return str(int(self.value)).encode('utf-8')
        elif self.data_type == DataType.FLOAT:
            return str(float(self.value)).encode('utf-8')
        elif self.data_type == DataType.DOUBLE:
            return str(float(self.value)).encode('utf-8')
        elif self.data_type == DataType.BOOLEAN:
            return str(bool(self.value)).encode('utf-8')
        elif self.data_type == DataType.JSON:
            return json.dumps(self.value).encode('utf-8')
        elif self.data_type == DataType.BYTES:
            return self.value if isinstance(self.value, bytes) else str(self.value).encode('utf-8')
        else:
            return str(self.value).encode('utf-8')
    
    def deserialize_value(self, data: bytes) -> Any:
        """反序列化值"""
        try:
            str_value = data.decode('utf-8')
            
            if self.data_type == DataType.STRING:
                return str_value
            elif self.data_type == DataType.INTEGER:
                return int(str_value)
            elif self.data_type == DataType.LONG:
                return int(str_value)
            elif self.data_type == DataType.FLOAT:
                return float(str_value)
            elif self.data_type == DataType.DOUBLE:
                return float(str_value)
            elif self.data_type == DataType.BOOLEAN:
                return str_value.lower() == 'true'
            elif self.data_type == DataType.JSON:
                return json.loads(str_value)
            elif self.data_type == DataType.BYTES:
                return data
            else:
                return str_value
        except Exception as e:
            print(f"反序列化失败: {e}")
            return data

@dataclass
class ColumnFamily:
    """列族数据类"""
    name: str
    max_versions: int = 3
    ttl: int = 2147483647  # 默认永不过期
    compression: CompressionType = CompressionType.NONE
    bloom_filter: BloomFilterType = BloomFilterType.ROW
    block_cache_enabled: bool = True
    block_size: int = 65536  # 64KB
    in_memory: bool = False
    
    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""
        return {
            "name": self.name,
            "maxVersions": self.max_versions,
            "ttl": self.ttl,
            "compression": self.compression.value,
            "bloomFilter": self.bloom_filter.value,
            "blockCacheEnabled": self.block_cache_enabled,
            "blockSize": self.block_size,
            "inMemory": self.in_memory
        }
    
    def validate(self) -> List[str]:
        """验证列族配置"""
        errors = []
        
        if not self.name or not self.name.strip():
            errors.append("列族名称不能为空")
        
        if self.max_versions < 1:
            errors.append("最大版本数必须大于0")
        
        if self.ttl < 0:
            errors.append("TTL不能为负数")
        
        if self.block_size < 1024:
            errors.append("块大小不能小于1KB")
        
        return errors

class HBaseDataModel:
    """HBase数据模型类"""
    
    def __init__(self):
        self.tables: Dict[str, 'HBaseTable'] = {}
        self.data_store: Dict[str, Dict[str, List[Cell]]] = {}  # table -> row_key -> cells
    
    def create_table(self, table_name: str, column_families: List[ColumnFamily]) -> bool:
        """创建表"""
        try:
            # 验证表名
            if not table_name or not table_name.strip():
                raise ValueError("表名不能为空")
            
            if table_name in self.tables:
                raise ValueError(f"表 {table_name} 已存在")
            
            # 验证列族
            if not column_families:
                raise ValueError("至少需要一个列族")
            
            for cf in column_families:
                errors = cf.validate()
                if errors:
                    raise ValueError(f"列族 {cf.name} 配置错误: {', '.join(errors)}")
            
            # 创建表
            table = HBaseTable(table_name, column_families)
            self.tables[table_name] = table
            self.data_store[table_name] = {}
            
            print(f"表 {table_name} 创建成功")
            return True
            
        except Exception as e:
            print(f"创建表失败: {e}")
            return False
    
    def put_cell(self, table_name: str, cell: Cell) -> bool:
        """插入单元格数据"""
        try:
            if table_name not in self.tables:
                raise ValueError(f"表 {table_name} 不存在")
            
            table = self.tables[table_name]
            
            # 验证列族是否存在
            if cell.column_family not in [cf.name for cf in table.column_families]:
                raise ValueError(f"列族 {cell.column_family} 不存在")
            
            # 初始化行数据
            if cell.row_key not in self.data_store[table_name]:
                self.data_store[table_name][cell.row_key] = []
            
            # 添加单元格
            self.data_store[table_name][cell.row_key].append(cell)
            
            # 清理旧版本(保持最大版本数限制)
            self._cleanup_versions(table_name, cell.row_key, cell.column_family, cell.column_qualifier)
            
            return True
            
        except Exception as e:
            print(f"插入数据失败: {e}")
            return False
    
    def get_cell(self, table_name: str, row_key: str, column_family: str, 
                column_qualifier: str, timestamp: Optional[int] = None) -> Optional[Cell]:
        """获取单元格数据"""
        try:
            if table_name not in self.tables:
                return None
            
            if row_key not in self.data_store[table_name]:
                return None
            
            cells = self.data_store[table_name][row_key]
            
            # 过滤匹配的单元格
            matching_cells = [
                cell for cell in cells
                if (cell.column_family == column_family and 
                    cell.column_qualifier == column_qualifier)
            ]
            
            if not matching_cells:
                return None
            
            # 如果指定了时间戳,返回精确匹配
            if timestamp is not None:
                for cell in matching_cells:
                    if cell.timestamp == timestamp:
                        return cell
                return None
            
            # 返回最新版本
            return max(matching_cells, key=lambda c: c.timestamp)
            
        except Exception as e:
            print(f"获取数据失败: {e}")
            return None
    
    def get_row(self, table_name: str, row_key: str) -> Dict[str, Cell]:
        """获取整行数据"""
        try:
            if table_name not in self.tables:
                return {}
            
            if row_key not in self.data_store[table_name]:
                return {}
            
            cells = self.data_store[table_name][row_key]
            
            # 按列分组,保留最新版本
            latest_cells = {}
            for cell in cells:
                column_name = cell.get_column_name()
                if (column_name not in latest_cells or 
                    cell.timestamp > latest_cells[column_name].timestamp):
                    latest_cells[column_name] = cell
            
            return latest_cells
            
        except Exception as e:
            print(f"获取行数据失败: {e}")
            return {}
    
    def scan_table(self, table_name: str, start_row: Optional[str] = None, 
                  end_row: Optional[str] = None, limit: int = 100) -> List[Dict[str, Cell]]:
        """扫描表数据"""
        try:
            if table_name not in self.tables:
                return []
            
            all_rows = []
            row_keys = sorted(self.data_store[table_name].keys())
            
            # 应用行键范围过滤
            if start_row:
                row_keys = [rk for rk in row_keys if rk >= start_row]
            if end_row:
                row_keys = [rk for rk in row_keys if rk < end_row]
            
            # 限制结果数量
            row_keys = row_keys[:limit]
            
            for row_key in row_keys:
                row_data = self.get_row(table_name, row_key)
                if row_data:
                    all_rows.append(row_data)
            
            return all_rows
            
        except Exception as e:
            print(f"扫描表失败: {e}")
            return []
    
    def delete_cell(self, table_name: str, row_key: str, column_family: str, 
                   column_qualifier: str, timestamp: Optional[int] = None) -> bool:
        """删除单元格数据"""
        try:
            if table_name not in self.tables:
                return False
            
            if row_key not in self.data_store[table_name]:
                return False
            
            cells = self.data_store[table_name][row_key]
            
            # 删除匹配的单元格
            original_count = len(cells)
            
            if timestamp is not None:
                # 删除特定时间戳的单元格
                self.data_store[table_name][row_key] = [
                    cell for cell in cells
                    if not (cell.column_family == column_family and 
                           cell.column_qualifier == column_qualifier and 
                           cell.timestamp == timestamp)
                ]
            else:
                # 删除所有版本
                self.data_store[table_name][row_key] = [
                    cell for cell in cells
                    if not (cell.column_family == column_family and 
                           cell.column_qualifier == column_qualifier)
                ]
            
            # 如果行为空,删除行
            if not self.data_store[table_name][row_key]:
                del self.data_store[table_name][row_key]
            
            return len(cells) != original_count
            
        except Exception as e:
            print(f"删除数据失败: {e}")
            return False
    
    def _cleanup_versions(self, table_name: str, row_key: str, 
                         column_family: str, column_qualifier: str):
        """清理旧版本数据"""
        try:
            table = self.tables[table_name]
            cf = next((cf for cf in table.column_families if cf.name == column_family), None)
            
            if not cf:
                return
            
            cells = self.data_store[table_name][row_key]
            
            # 找到同一列的所有单元格
            column_cells = [
                cell for cell in cells
                if (cell.column_family == column_family and 
                    cell.column_qualifier == column_qualifier)
            ]
            
            # 按时间戳排序,保留最新的版本
            column_cells.sort(key=lambda c: c.timestamp, reverse=True)
            
            if len(column_cells) > cf.max_versions:
                # 移除超出版本限制的单元格
                cells_to_remove = column_cells[cf.max_versions:]
                
                for cell_to_remove in cells_to_remove:
                    self.data_store[table_name][row_key].remove(cell_to_remove)
            
        except Exception as e:
            print(f"清理版本失败: {e}")
    
    def get_table_info(self, table_name: str) -> Optional[Dict[str, Any]]:
        """获取表信息"""
        if table_name not in self.tables:
            return None
        
        table = self.tables[table_name]
        row_count = len(self.data_store[table_name])
        
        # 计算每个列族的统计信息
        cf_stats = {}
        for cf in table.column_families:
            cf_stats[cf.name] = {
                "configuration": cf.to_dict(),
                "cell_count": 0,
                "total_size": 0
            }
        
        # 统计数据
        for row_key, cells in self.data_store[table_name].items():
            for cell in cells:
                if cell.column_family in cf_stats:
                    cf_stats[cell.column_family]["cell_count"] += 1
                    cf_stats[cell.column_family]["total_size"] += len(cell.serialize_value())
        
        return {
            "table_name": table_name,
            "row_count": row_count,
            "column_families": cf_stats,
            "created_time": table.created_time,
            "enabled": table.enabled
        }
    
    def list_tables(self) -> List[str]:
        """列出所有表"""
        return list(self.tables.keys())

@dataclass
class HBaseTable:
    """HBase表数据类"""
    name: str
    column_families: List[ColumnFamily]
    enabled: bool = True
    created_time: str = None
    
    def __post_init__(self):
        if self.created_time is None:
            self.created_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

# 使用示例
data_model = HBaseDataModel()

# 创建列族
user_info_cf = ColumnFamily(
    name="info",
    max_versions=3,
    ttl=86400 * 30,  # 30天
    compression=CompressionType.SNAPPY,
    bloom_filter=BloomFilterType.ROW
)

user_stats_cf = ColumnFamily(
    name="stats",
    max_versions=1,
    compression=CompressionType.GZIP,
    in_memory=True
)

# 创建表
success = data_model.create_table("users", [user_info_cf, user_stats_cf])
print(f"创建表结果: {success}")

# 插入数据
cells = [
    Cell("user001", "info", "name", "张三", 0, DataType.STRING),
    Cell("user001", "info", "age", 25, 0, DataType.INTEGER),
    Cell("user001", "info", "email", "zhangsan@example.com", 0, DataType.STRING),
    Cell("user001", "stats", "login_count", 100, 0, DataType.INTEGER),
    Cell("user001", "stats", "last_login", "2024-01-15 10:30:00", 0, DataType.STRING)
]

for cell in cells:
    result = data_model.put_cell("users", cell)
    print(f"插入 {cell.get_column_name()}: {result}")

# 查询数据
row_data = data_model.get_row("users", "user001")
print(f"\n用户数据: {len(row_data)} 列")
for column_name, cell in row_data.items():
    print(f"  {column_name}: {cell.value}")

# 获取表信息
table_info = data_model.get_table_info("users")
print(f"\n表信息: {json.dumps(table_info, indent=2, ensure_ascii=False)}")

2. 表设计原则

2.1 行键设计

class RowKeyPattern(Enum):
    """行键模式枚举"""
    SEQUENTIAL = "sequential"  # 顺序模式
    HASH_PREFIX = "hash_prefix"  # 哈希前缀
    REVERSE_TIMESTAMP = "reverse_timestamp"  # 反向时间戳
    COMPOSITE = "composite"  # 复合键
    SALTED = "salted"  # 加盐

class RowKeyDesigner:
    """行键设计器类"""
    
    def __init__(self):
        self.patterns = {
            RowKeyPattern.SEQUENTIAL: self._generate_sequential_key,
            RowKeyPattern.HASH_PREFIX: self._generate_hash_prefix_key,
            RowKeyPattern.REVERSE_TIMESTAMP: self._generate_reverse_timestamp_key,
            RowKeyPattern.COMPOSITE: self._generate_composite_key,
            RowKeyPattern.SALTED: self._generate_salted_key
        }
    
    def generate_row_key(self, pattern: RowKeyPattern, **kwargs) -> str:
        """生成行键"""
        if pattern not in self.patterns:
            raise ValueError(f"不支持的行键模式: {pattern}")
        
        return self.patterns[pattern](**kwargs)
    
    def _generate_sequential_key(self, prefix: str = "", sequence: int = 0, 
                               padding: int = 10) -> str:
        """生成顺序行键"""
        seq_str = str(sequence).zfill(padding)
        return f"{prefix}{seq_str}" if prefix else seq_str
    
    def _generate_hash_prefix_key(self, original_key: str, hash_length: int = 4) -> str:
        """生成哈希前缀行键"""
        hash_value = hashlib.md5(original_key.encode()).hexdigest()[:hash_length]
        return f"{hash_value}_{original_key}"
    
    def _generate_reverse_timestamp_key(self, prefix: str = "", 
                                      timestamp: Optional[int] = None) -> str:
        """生成反向时间戳行键"""
        if timestamp is None:
            timestamp = int(datetime.now().timestamp() * 1000)
        
        # 使用Long.MAX_VALUE - timestamp实现反向排序
        reverse_timestamp = 9223372036854775807 - timestamp
        
        if prefix:
            return f"{prefix}_{reverse_timestamp}"
        else:
            return str(reverse_timestamp)
    
    def _generate_composite_key(self, components: List[str], separator: str = "_") -> str:
        """生成复合行键"""
        if not components:
            raise ValueError("复合键组件不能为空")
        
        # 清理组件中的分隔符
        cleaned_components = [comp.replace(separator, "") for comp in components]
        return separator.join(cleaned_components)
    
    def _generate_salted_key(self, original_key: str, salt_buckets: int = 100) -> str:
        """生成加盐行键"""
        # 计算盐值
        hash_value = hashlib.md5(original_key.encode()).hexdigest()
        salt = int(hash_value, 16) % salt_buckets
        
        # 格式化盐值(确保固定长度)
        salt_str = str(salt).zfill(len(str(salt_buckets - 1)))
        
        return f"{salt_str}_{original_key}"
    
    def analyze_row_key_distribution(self, row_keys: List[str]) -> Dict[str, Any]:
        """分析行键分布"""
        if not row_keys:
            return {"error": "行键列表为空"}
        
        # 基本统计
        total_keys = len(row_keys)
        unique_keys = len(set(row_keys))
        avg_length = sum(len(key) for key in row_keys) / total_keys
        
        # 前缀分析
        prefix_distribution = {}
        for key in row_keys:
            prefix = key[:2] if len(key) >= 2 else key
            prefix_distribution[prefix] = prefix_distribution.get(prefix, 0) + 1
        
        # 热点检测
        sorted_prefixes = sorted(prefix_distribution.items(), key=lambda x: x[1], reverse=True)
        hotspot_threshold = total_keys * 0.1  # 10%阈值
        hotspots = [prefix for prefix, count in sorted_prefixes if count > hotspot_threshold]
        
        # 长度分布
        length_distribution = {}
        for key in row_keys:
            length = len(key)
            length_distribution[length] = length_distribution.get(length, 0) + 1
        
        return {
            "total_keys": total_keys,
            "unique_keys": unique_keys,
            "uniqueness_ratio": unique_keys / total_keys,
            "average_length": round(avg_length, 2),
            "prefix_distribution": dict(sorted_prefixes[:10]),  # 前10个前缀
            "hotspots": hotspots,
            "length_distribution": length_distribution,
            "recommendations": self._generate_recommendations(hotspots, unique_keys, total_keys)
        }
    
    def _generate_recommendations(self, hotspots: List[str], unique_keys: int, 
                                total_keys: int) -> List[str]:
        """生成优化建议"""
        recommendations = []
        
        if hotspots:
            recommendations.append(f"检测到热点前缀: {', '.join(hotspots)},建议使用加盐或哈希前缀")
        
        uniqueness_ratio = unique_keys / total_keys
        if uniqueness_ratio < 0.9:
            recommendations.append(f"行键唯一性较低({uniqueness_ratio:.2%}),建议增加更多区分字段")
        
        if not recommendations:
            recommendations.append("行键分布良好,无明显问题")
        
        return recommendations

# 使用示例
row_key_designer = RowKeyDesigner()

# 生成不同模式的行键
print("=== 行键生成示例 ===")

# 顺序键
seq_key = row_key_designer.generate_row_key(
    RowKeyPattern.SEQUENTIAL, 
    prefix="user", 
    sequence=123, 
    padding=8
)
print(f"顺序键: {seq_key}")

# 哈希前缀键
hash_key = row_key_designer.generate_row_key(
    RowKeyPattern.HASH_PREFIX, 
    original_key="user_zhangsan_20240115"
)
print(f"哈希前缀键: {hash_key}")

# 反向时间戳键
reverse_ts_key = row_key_designer.generate_row_key(
    RowKeyPattern.REVERSE_TIMESTAMP, 
    prefix="event"
)
print(f"反向时间戳键: {reverse_ts_key}")

# 复合键
composite_key = row_key_designer.generate_row_key(
    RowKeyPattern.COMPOSITE, 
    components=["user", "zhangsan", "20240115"]
)
print(f"复合键: {composite_key}")

# 加盐键
salted_key = row_key_designer.generate_row_key(
    RowKeyPattern.SALTED, 
    original_key="user_zhangsan", 
    salt_buckets=100
)
print(f"加盐键: {salted_key}")

# 分析行键分布
print("\n=== 行键分布分析 ===")
sample_keys = [
    "user_001", "user_002", "user_003", "user_004", "user_005",
    "user_001_backup", "admin_001", "admin_002", "guest_001", "guest_002"
]

distribution_analysis = row_key_designer.analyze_row_key_distribution(sample_keys)
print(f"分析结果: {json.dumps(distribution_analysis, indent=2, ensure_ascii=False)}")

2.2 列族设计

class AccessPattern(Enum):
    """访问模式枚举"""
    READ_HEAVY = "read_heavy"
    WRITE_HEAVY = "write_heavy"
    BALANCED = "balanced"
    ANALYTICAL = "analytical"
    REAL_TIME = "real_time"

class DataCharacteristic(Enum):
    """数据特征枚举"""
    HOT_DATA = "hot_data"  # 热数据
    COLD_DATA = "cold_data"  # 冷数据
    LARGE_OBJECTS = "large_objects"  # 大对象
    SMALL_FREQUENT = "small_frequent"  # 小而频繁
    TIME_SERIES = "time_series"  # 时间序列

class ColumnFamilyDesigner:
    """列族设计器类"""
    
    def __init__(self):
        self.optimization_rules = self._initialize_optimization_rules()
    
    def _initialize_optimization_rules(self) -> Dict[str, Dict[str, Any]]:
        """初始化优化规则"""
        return {
            "read_heavy": {
                "block_cache_enabled": True,
                "bloom_filter": BloomFilterType.ROW,
                "compression": CompressionType.SNAPPY,
                "block_size": 65536,
                "in_memory": False
            },
            "write_heavy": {
                "block_cache_enabled": False,
                "bloom_filter": BloomFilterType.NONE,
                "compression": CompressionType.LZ4,
                "block_size": 131072,
                "in_memory": False
            },
            "analytical": {
                "block_cache_enabled": True,
                "bloom_filter": BloomFilterType.ROWCOL,
                "compression": CompressionType.GZIP,
                "block_size": 262144,
                "in_memory": False
            },
            "real_time": {
                "block_cache_enabled": True,
                "bloom_filter": BloomFilterType.ROW,
                "compression": CompressionType.NONE,
                "block_size": 32768,
                "in_memory": True
            },
            "hot_data": {
                "in_memory": True,
                "block_cache_enabled": True,
                "ttl": 86400 * 7  # 7天
            },
            "cold_data": {
                "in_memory": False,
                "compression": CompressionType.GZIP,
                "ttl": 86400 * 365  # 1年
            },
            "large_objects": {
                "compression": CompressionType.GZIP,
                "block_size": 1048576,  # 1MB
                "block_cache_enabled": False
            },
            "time_series": {
                "max_versions": 1,
                "compression": CompressionType.SNAPPY,
                "bloom_filter": BloomFilterType.ROW
            }
        }
    
    def design_column_family(self, name: str, access_pattern: AccessPattern, 
                           data_characteristic: DataCharacteristic, 
                           custom_config: Optional[Dict[str, Any]] = None) -> ColumnFamily:
        """设计列族"""
        # 基础配置
        config = {
            "name": name,
            "max_versions": 3,
            "ttl": 2147483647,
            "compression": CompressionType.NONE,
            "bloom_filter": BloomFilterType.ROW,
            "block_cache_enabled": True,
            "block_size": 65536,
            "in_memory": False
        }
        
        # 应用访问模式优化
        pattern_config = self.optimization_rules.get(access_pattern.value, {})
        config.update(pattern_config)
        
        # 应用数据特征优化
        characteristic_config = self.optimization_rules.get(data_characteristic.value, {})
        config.update(characteristic_config)
        
        # 应用自定义配置
        if custom_config:
            config.update(custom_config)
        
        return ColumnFamily(
            name=config["name"],
            max_versions=config["max_versions"],
            ttl=config["ttl"],
            compression=config["compression"],
            bloom_filter=config["bloom_filter"],
            block_cache_enabled=config["block_cache_enabled"],
            block_size=config["block_size"],
            in_memory=config["in_memory"]
        )
    
    def analyze_column_family_performance(self, cf: ColumnFamily, 
                                        usage_stats: Dict[str, Any]) -> Dict[str, Any]:
        """分析列族性能"""
        analysis = {
            "column_family": cf.name,
            "current_config": cf.to_dict(),
            "performance_metrics": {},
            "recommendations": []
        }
        
        # 读写比例分析
        read_ratio = usage_stats.get("read_ratio", 0.5)
        write_ratio = 1 - read_ratio
        
        # 数据大小分析
        avg_cell_size = usage_stats.get("avg_cell_size", 1024)
        total_data_size = usage_stats.get("total_data_size", 0)
        
        # 访问频率分析
        access_frequency = usage_stats.get("access_frequency", "medium")
        
        # 性能指标计算
        analysis["performance_metrics"] = {
            "read_ratio": read_ratio,
            "write_ratio": write_ratio,
            "avg_cell_size": avg_cell_size,
            "total_data_size": total_data_size,
            "access_frequency": access_frequency,
            "compression_efficiency": self._estimate_compression_efficiency(cf.compression, avg_cell_size)
        }
        
        # 生成优化建议
        recommendations = []
        
        # 读写比例优化
        if read_ratio > 0.8:
            if not cf.block_cache_enabled:
                recommendations.append("建议启用块缓存以提高读性能")
            if cf.bloom_filter == BloomFilterType.NONE:
                recommendations.append("建议启用布隆过滤器以减少磁盘I/O")
        elif write_ratio > 0.8:
            if cf.block_cache_enabled:
                recommendations.append("写密集场景建议禁用块缓存以节省内存")
            if cf.compression != CompressionType.LZ4:
                recommendations.append("写密集场景建议使用LZ4压缩以提高写入速度")
        
        # 数据大小优化
        if avg_cell_size > 100 * 1024:  # 100KB
            if cf.compression == CompressionType.NONE:
                recommendations.append("大数据单元建议启用压缩以节省存储空间")
            if cf.block_size < 256 * 1024:  # 256KB
                recommendations.append("大数据单元建议增加块大小以提高I/O效率")
        
        # 访问频率优化
        if access_frequency == "high" and not cf.in_memory:
            recommendations.append("高频访问数据建议启用内存存储")
        elif access_frequency == "low" and cf.in_memory:
            recommendations.append("低频访问数据建议禁用内存存储以节省内存")
        
        # TTL优化
        if cf.ttl == 2147483647:  # 永不过期
            recommendations.append("建议设置合适的TTL以自动清理过期数据")
        
        analysis["recommendations"] = recommendations
        
        return analysis
    
    def _estimate_compression_efficiency(self, compression: CompressionType, 
                                       avg_cell_size: int) -> float:
        """估算压缩效率"""
        # 简化的压缩比估算
        compression_ratios = {
            CompressionType.NONE: 1.0,
            CompressionType.GZIP: 0.3,
            CompressionType.LZO: 0.5,
            CompressionType.SNAPPY: 0.6,
            CompressionType.LZ4: 0.7,
            CompressionType.BZIP2: 0.25
        }
        
        base_ratio = compression_ratios.get(compression, 1.0)
        
        # 根据数据大小调整压缩效率
        if avg_cell_size < 1024:  # 小数据压缩效果较差
            return min(base_ratio * 1.5, 1.0)
        elif avg_cell_size > 10 * 1024:  # 大数据压缩效果较好
            return base_ratio * 0.8
        else:
            return base_ratio
    
    def generate_table_schema(self, table_name: str, 
                            schema_requirements: List[Dict[str, Any]]) -> List[ColumnFamily]:
        """生成表模式"""
        column_families = []
        
        for req in schema_requirements:
            cf_name = req.get("name")
            access_pattern = AccessPattern(req.get("access_pattern", "balanced"))
            data_characteristic = DataCharacteristic(req.get("data_characteristic", "small_frequent"))
            custom_config = req.get("custom_config", {})
            
            cf = self.design_column_family(
                cf_name, access_pattern, data_characteristic, custom_config
            )
            column_families.append(cf)
        
        return column_families

# 使用示例
cf_designer = ColumnFamilyDesigner()

print("=== 列族设计示例 ===")

# 设计不同类型的列族
user_info_cf = cf_designer.design_column_family(
    "info",
    AccessPattern.READ_HEAVY,
    DataCharacteristic.HOT_DATA
)
print(f"用户信息列族: {user_info_cf.to_dict()}")

user_logs_cf = cf_designer.design_column_family(
    "logs",
    AccessPattern.WRITE_HEAVY,
    DataCharacteristic.TIME_SERIES,
    {"ttl": 86400 * 30}  # 30天TTL
)
print(f"用户日志列族: {user_logs_cf.to_dict()}")

# 性能分析
usage_stats = {
    "read_ratio": 0.9,
    "avg_cell_size": 2048,
    "total_data_size": 1024 * 1024 * 100,  # 100MB
    "access_frequency": "high"
}

performance_analysis = cf_designer.analyze_column_family_performance(user_info_cf, usage_stats)
print(f"\n性能分析: {json.dumps(performance_analysis, indent=2, ensure_ascii=False)}")

# 生成完整表模式
schema_requirements = [
    {
        "name": "profile",
        "access_pattern": "read_heavy",
        "data_characteristic": "hot_data"
    },
    {
        "name": "activity",
        "access_pattern": "write_heavy",
        "data_characteristic": "time_series",
        "custom_config": {"max_versions": 1, "ttl": 86400 * 7}
    },
    {
        "name": "analytics",
        "access_pattern": "analytical",
        "data_characteristic": "cold_data"
    }
]

table_schema = cf_designer.generate_table_schema("user_data", schema_requirements)
print(f"\n表模式设计: {len(table_schema)} 个列族")
for cf in table_schema:
    print(f"  {cf.name}: {cf.to_dict()}")

3. 数据建模最佳实践

3.1 查询驱动设计

class QueryType(Enum):
    """查询类型枚举"""
    POINT_LOOKUP = "point_lookup"  # 点查询
    RANGE_SCAN = "range_scan"  # 范围扫描
    PREFIX_SCAN = "prefix_scan"  # 前缀扫描
    MULTI_GET = "multi_get"  # 批量获取
    AGGREGATION = "aggregation"  # 聚合查询

@dataclass
class QueryPattern:
    """查询模式数据类"""
    query_type: QueryType
    frequency: float  # 查询频率(每秒)
    row_key_pattern: str  # 行键模式
    column_filters: List[str]  # 列过滤器
    expected_result_size: int  # 预期结果大小
    latency_requirement: int  # 延迟要求(毫秒)

class DataModelOptimizer:
    """数据模型优化器类"""
    
    def __init__(self):
        self.query_patterns: List[QueryPattern] = []
        self.optimization_strategies = self._initialize_strategies()
    
    def _initialize_strategies(self) -> Dict[QueryType, Dict[str, Any]]:
        """初始化优化策略"""
        return {
            QueryType.POINT_LOOKUP: {
                "row_key_strategy": "exact_match",
                "bloom_filter": BloomFilterType.ROW,
                "block_cache": True,
                "compression": CompressionType.SNAPPY
            },
            QueryType.RANGE_SCAN: {
                "row_key_strategy": "range_friendly",
                "bloom_filter": BloomFilterType.NONE,
                "block_cache": True,
                "compression": CompressionType.SNAPPY
            },
            QueryType.PREFIX_SCAN: {
                "row_key_strategy": "prefix_optimized",
                "bloom_filter": BloomFilterType.ROWPREFIX_FIXED_LENGTH,
                "block_cache": True,
                "compression": CompressionType.LZ4
            },
            QueryType.MULTI_GET: {
                "row_key_strategy": "batch_friendly",
                "bloom_filter": BloomFilterType.ROW,
                "block_cache": True,
                "compression": CompressionType.SNAPPY
            },
            QueryType.AGGREGATION: {
                "row_key_strategy": "scan_optimized",
                "bloom_filter": BloomFilterType.NONE,
                "block_cache": False,
                "compression": CompressionType.GZIP
            }
        }
    
    def add_query_pattern(self, pattern: QueryPattern):
        """添加查询模式"""
        self.query_patterns.append(pattern)
    
    def analyze_query_patterns(self) -> Dict[str, Any]:
        """分析查询模式"""
        if not self.query_patterns:
            return {"error": "没有查询模式数据"}
        
        # 统计查询类型分布
        query_type_stats = {}
        total_frequency = 0
        
        for pattern in self.query_patterns:
            query_type = pattern.query_type.value
            query_type_stats[query_type] = query_type_stats.get(query_type, 0) + pattern.frequency
            total_frequency += pattern.frequency
        
        # 计算比例
        query_type_ratios = {
            qtype: freq / total_frequency 
            for qtype, freq in query_type_stats.items()
        }
        
        # 分析延迟要求
        latency_requirements = [p.latency_requirement for p in self.query_patterns]
        avg_latency_req = sum(latency_requirements) / len(latency_requirements)
        
        # 分析结果大小
        result_sizes = [p.expected_result_size for p in self.query_patterns]
        avg_result_size = sum(result_sizes) / len(result_sizes)
        
        return {
            "total_patterns": len(self.query_patterns),
            "total_frequency": total_frequency,
            "query_type_distribution": query_type_stats,
            "query_type_ratios": query_type_ratios,
            "average_latency_requirement": avg_latency_req,
            "average_result_size": avg_result_size,
            "dominant_query_type": max(query_type_ratios.items(), key=lambda x: x[1])[0]
        }
    
    def generate_optimization_recommendations(self) -> List[str]:
        """生成优化建议"""
        analysis = self.analyze_query_patterns()
        
        if "error" in analysis:
            return [analysis["error"]]
        
        recommendations = []
        
        # 基于主导查询类型的建议
        dominant_type = QueryType(analysis["dominant_query_type"])
        strategy = self.optimization_strategies[dominant_type]
        
        recommendations.append(f"主导查询类型: {dominant_type.value}")
        recommendations.append(f"建议行键策略: {strategy['row_key_strategy']}")
        recommendations.append(f"建议布隆过滤器: {strategy['bloom_filter'].value}")
        recommendations.append(f"建议压缩算法: {strategy['compression'].value}")
        
        # 基于延迟要求的建议
        if analysis["average_latency_requirement"] < 10:  # 10ms
            recommendations.append("超低延迟要求,建议启用内存存储")
        elif analysis["average_latency_requirement"] < 100:  # 100ms
            recommendations.append("低延迟要求,建议优化块缓存配置")
        
        # 基于结果大小的建议
        if analysis["average_result_size"] > 1024 * 1024:  # 1MB
            recommendations.append("大结果集查询,建议启用压缩")
        
        # 基于查询频率的建议
        if analysis["total_frequency"] > 1000:  # 1000 QPS
            recommendations.append("高频查询,建议增加RegionServer数量")
        
        return recommendations
    
    def design_denormalized_schema(self, entity_name: str, 
                                 relationships: List[Dict[str, Any]]) -> Dict[str, Any]:
        """设计反规范化模式"""
        schema = {
            "table_name": f"{entity_name}_denormalized",
            "row_key_design": {},
            "column_families": [],
            "denormalization_strategy": []
        }
        
        # 分析关系类型
        one_to_one = [r for r in relationships if r.get("type") == "one_to_one"]
        one_to_many = [r for r in relationships if r.get("type") == "one_to_many"]
        many_to_many = [r for r in relationships if r.get("type") == "many_to_many"]
        
        # 行键设计
        primary_entity = entity_name
        row_key_components = [primary_entity + "_id"]
        
        # 处理一对多关系
        if one_to_many:
            # 为一对多关系添加时间戳或序列号
            row_key_components.append("timestamp")
            schema["denormalization_strategy"].append(
                "一对多关系:在行键中包含时间戳,每个相关实体创建单独行"
            )
        
        schema["row_key_design"] = {
            "pattern": "composite",
            "components": row_key_components,
            "example": "_".join([f"{comp}_value" for comp in row_key_components])
        }
        
        # 列族设计
        # 主实体列族
        main_cf = {
            "name": "main",
            "description": f"{entity_name}的主要属性",
            "access_pattern": "read_heavy"
        }
        schema["column_families"].append(main_cf)
        
        # 一对一关系列族
        for rel in one_to_one:
            rel_cf = {
                "name": rel["related_entity"].lower(),
                "description": f"关联的{rel['related_entity']}数据",
                "access_pattern": "read_heavy"
            }
            schema["column_families"].append(rel_cf)
            schema["denormalization_strategy"].append(
                f"一对一关系:{rel['related_entity']}数据存储在独立列族中"
            )
        
        # 一对多关系处理
        for rel in one_to_many:
            if rel.get("embed", False):
                # 嵌入式存储
                rel_cf = {
                    "name": rel["related_entity"].lower() + "_list",
                    "description": f"嵌入的{rel['related_entity']}列表",
                    "access_pattern": "write_heavy",
                    "storage_format": "json_array"
                }
                schema["column_families"].append(rel_cf)
                schema["denormalization_strategy"].append(
                    f"一对多关系:{rel['related_entity']}数据以JSON数组形式嵌入存储"
                )
            else:
                # 分离式存储
                schema["denormalization_strategy"].append(
                    f"一对多关系:{rel['related_entity']}数据存储在单独表中,通过行键关联"
                )
        
        # 多对多关系处理
        for rel in many_to_many:
            schema["denormalization_strategy"].append(
                f"多对多关系:{rel['related_entity']}关系存储在独立的关联表中"
            )
        
        return schema
    
    def validate_schema_design(self, schema: Dict[str, Any]) -> Dict[str, Any]:
        """验证模式设计"""
        validation_result = {
            "valid": True,
            "warnings": [],
            "errors": [],
            "suggestions": []
        }
        
        # 检查行键设计
        row_key_design = schema.get("row_key_design", {})
        if not row_key_design:
            validation_result["errors"].append("缺少行键设计")
            validation_result["valid"] = False
        else:
            components = row_key_design.get("components", [])
            if len(components) > 5:
                validation_result["warnings"].append("行键组件过多,可能影响性能")
            
            if "timestamp" in components and components.index("timestamp") == 0:
                validation_result["warnings"].append("时间戳作为行键前缀可能导致热点问题")
        
        # 检查列族数量
        column_families = schema.get("column_families", [])
        if len(column_families) > 10:
            validation_result["warnings"].append("列族数量过多,建议合并相关列族")
        elif len(column_families) == 0:
            validation_result["errors"].append("至少需要一个列族")
            validation_result["valid"] = False
        
        # 检查反规范化策略
        strategies = schema.get("denormalization_strategy", [])
        if not strategies:
            validation_result["suggestions"].append("建议明确反规范化策略")
        
        return validation_result

# 使用示例
optimizer = DataModelOptimizer()

print("=== 查询驱动设计示例 ===")

# 添加查询模式
query_patterns = [
    QueryPattern(
        QueryType.POINT_LOOKUP, 500.0, "user_id", 
        ["info:name", "info:email"], 1024, 5
    ),
    QueryPattern(
        QueryType.RANGE_SCAN, 50.0, "user_id_timestamp", 
        ["activity:*"], 10240, 50
    ),
    QueryPattern(
        QueryType.PREFIX_SCAN, 100.0, "user_id_prefix", 
        ["stats:*"], 5120, 20
    )
]

for pattern in query_patterns:
    optimizer.add_query_pattern(pattern)

# 分析查询模式
analysis = optimizer.analyze_query_patterns()
print(f"查询模式分析: {json.dumps(analysis, indent=2, ensure_ascii=False)}")

# 生成优化建议
recommendations = optimizer.generate_optimization_recommendations()
print(f"\n优化建议:")
for i, rec in enumerate(recommendations, 1):
    print(f"  {i}. {rec}")

# 设计反规范化模式
relationships = [
    {"type": "one_to_one", "related_entity": "Profile"},
    {"type": "one_to_many", "related_entity": "Order", "embed": True},
    {"type": "many_to_many", "related_entity": "Tag"}
]

denorm_schema = optimizer.design_denormalized_schema("User", relationships)
print(f"\n反规范化模式: {json.dumps(denorm_schema, indent=2, ensure_ascii=False)}")

# 验证模式设计
validation = optimizer.validate_schema_design(denorm_schema)
print(f"\n模式验证: {json.dumps(validation, indent=2, ensure_ascii=False)}")

4. 总结

本章深入介绍了HBase的数据模型和表设计,包括:

关键要点

  1. 数据模型基础

    • Cell、ColumnFamily、HBaseTable等核心概念
    • 数据类型和序列化机制
    • 版本控制和TTL管理
  2. 行键设计

    • 顺序、哈希前缀、反向时间戳等模式
    • 热点检测和分布分析
    • 加盐和复合键策略
  3. 列族设计

    • 基于访问模式的优化
    • 压缩和缓存策略
    • 性能分析和调优建议
  4. 查询驱动设计

    • 查询模式分析
    • 反规范化策略
    • 模式验证和优化

设计原则

  • 查询优先:根据查询需求设计表结构
  • 避免热点:合理设计行键分布
  • 列族分离:按访问模式分离列族
  • 适度反规范化:平衡查询性能和存储成本
  • 版本控制:合理设置版本数和TTL

最佳实践

  • 在设计阶段充分分析查询需求
  • 使用工具验证行键分布
  • 定期监控和优化表性能
  • 建立完善的数据建模规范
  • 考虑数据增长和扩展性

下一步学习

在下一章中,我们将学习HBase的基本操作,包括数据的增删改查和批量操作。