概述

表引擎是ClickHouse的核心特性之一,决定了数据的存储方式、查询性能、数据一致性和可用性。不同的表引擎适用于不同的使用场景,选择合适的表引擎对系统性能至关重要。

MergeTree系列引擎

MergeTree引擎

MergeTree是ClickHouse最重要的表引擎,专为大数据量的插入和查询而设计。

from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Union
from datetime import datetime, timedelta
import json

class MergeTreeEngine(Enum):
    """MergeTree引擎类型"""
    MERGE_TREE = "MergeTree"
    REPLACING_MERGE_TREE = "ReplacingMergeTree"
    SUMMING_MERGE_TREE = "SummingMergeTree"
    AGGREGATING_MERGE_TREE = "AggregatingMergeTree"
    COLLAPSING_MERGE_TREE = "CollapsingMergeTree"
    VERSIONED_COLLAPSING_MERGE_TREE = "VersionedCollapsingMergeTree"
    GRAPHITE_MERGE_TREE = "GraphiteMergeTree"

class PartitionStrategy(Enum):
    """分区策略"""
    BY_DATE = "toYYYYMM(date_column)"
    BY_YEAR_MONTH = "toYYYYMM(date_column)"
    BY_YEAR_MONTH_DAY = "toYYYYMMDD(date_column)"
    BY_HOUR = "toYYYYMMDDhh(datetime_column)"
    BY_CUSTOM = "custom_expression"
    NO_PARTITION = ""

class CompressionCodec(Enum):
    """压缩编解码器"""
    LZ4 = "LZ4"
    LZ4HC = "LZ4HC"
    ZSTD = "ZSTD"
    DEFLATE_QPL = "DEFLATE_QPL"
    DELTA = "Delta"
    DOUBLE_DELTA = "DoubleDelta"
    GORILLA = "Gorilla"
    T64 = "T64"

@dataclass
class MergeTreeSettings:
    """MergeTree设置"""
    index_granularity: int = 8192
    index_granularity_bytes: int = 10485760  # 10MB
    min_index_granularity_bytes: int = 1024
    enable_mixed_granularity_parts: bool = True
    use_minimalistic_part_header_in_zookeeper: bool = True
    min_merge_bytes_to_use_direct_io: int = 10737418240  # 10GB
    merge_with_ttl_timeout: int = 86400  # 24 hours
    write_final_mark: bool = True
    merge_max_block_size: int = 8192
    storage_policy: str = "default"

@dataclass
class TableSchema:
    """表结构定义"""
    columns: List[Dict[str, str]]  # [{"name": "col1", "type": "String"}, ...]
    engine: MergeTreeEngine
    partition_by: str = ""
    order_by: List[str] = None
    primary_key: List[str] = None
    sample_by: str = ""
    ttl: str = ""
    settings: MergeTreeSettings = None
    
    def __post_init__(self):
        if self.order_by is None:
            self.order_by = []
        if self.primary_key is None:
            self.primary_key = []
        if self.settings is None:
            self.settings = MergeTreeSettings()

@dataclass
class PartitionInfo:
    """分区信息"""
    partition_id: str
    name: str
    active: bool
    marks: int
    rows: int
    bytes_on_disk: int
    data_compressed_bytes: int
    data_uncompressed_bytes: int
    marks_bytes: int
    modification_time: datetime
    remove_time: Optional[datetime] = None
    refcount: int = 1
    min_date: Optional[str] = None
    max_date: Optional[str] = None

class ClickHouseMergeTreeManager:
    """ClickHouse MergeTree引擎管理器"""
    
    def __init__(self):
        self.table_schemas: Dict[str, TableSchema] = {}
        self.partition_info: Dict[str, List[PartitionInfo]] = {}
        self.engine_characteristics = self._initialize_engine_characteristics()
    
    def _initialize_engine_characteristics(self) -> Dict[MergeTreeEngine, Dict[str, Any]]:
        """初始化引擎特性"""
        return {
            MergeTreeEngine.MERGE_TREE: {
                "description": "基础MergeTree引擎,支持主键索引和分区",
                "use_cases": ["通用OLAP场景", "时间序列数据", "日志分析"],
                "advantages": ["高性能查询", "数据压缩", "并行处理"],
                "limitations": ["不支持更新", "删除操作有限"],
                "deduplication": False,
                "aggregation": False,
                "collapse": False
            },
            MergeTreeEngine.REPLACING_MERGE_TREE: {
                "description": "支持数据去重的MergeTree引擎",
                "use_cases": ["需要去重的数据", "维度表", "配置数据"],
                "advantages": ["自动去重", "保持最新数据"],
                "limitations": ["去重不是实时的", "需要手动OPTIMIZE"],
                "deduplication": True,
                "aggregation": False,
                "collapse": False
            },
            MergeTreeEngine.SUMMING_MERGE_TREE: {
                "description": "自动聚合数值列的MergeTree引擎",
                "use_cases": ["指标聚合", "计数器数据", "财务数据"],
                "advantages": ["自动求和", "减少存储空间"],
                "limitations": ["只能聚合数值列", "聚合不是实时的"],
                "deduplication": False,
                "aggregation": True,
                "collapse": False
            },
            MergeTreeEngine.AGGREGATING_MERGE_TREE: {
                "description": "支持复杂聚合函数的MergeTree引擎",
                "use_cases": ["复杂指标计算", "预聚合数据", "实时分析"],
                "advantages": ["支持复杂聚合", "高性能查询"],
                "limitations": ["需要AggregateFunction类型", "复杂度较高"],
                "deduplication": False,
                "aggregation": True,
                "collapse": False
            },
            MergeTreeEngine.COLLAPSING_MERGE_TREE: {
                "description": "支持行折叠的MergeTree引擎",
                "use_cases": ["状态变更跟踪", "增量更新", "事件流处理"],
                "advantages": ["支持逻辑删除", "状态跟踪"],
                "limitations": ["需要Sign列", "查询复杂"],
                "deduplication": False,
                "aggregation": False,
                "collapse": True
            },
            MergeTreeEngine.VERSIONED_COLLAPSING_MERGE_TREE: {
                "description": "带版本的折叠MergeTree引擎",
                "use_cases": ["版本化数据", "时间序列状态", "审计日志"],
                "advantages": ["版本控制", "状态跟踪", "数据一致性"],
                "limitations": ["需要Version列", "存储开销大"],
                "deduplication": False,
                "aggregation": False,
                "collapse": True
            },
            MergeTreeEngine.GRAPHITE_MERGE_TREE: {
                "description": "专为Graphite指标数据设计的引擎",
                "use_cases": ["Graphite指标", "时间序列监控", "性能指标"],
                "advantages": ["自动降采样", "数据保留策略"],
                "limitations": ["特定数据格式", "配置复杂"],
                "deduplication": False,
                "aggregation": True,
                "collapse": False
            }
        }
    
    def create_table_ddl(self, database: str, table_name: str, schema: TableSchema) -> str:
        """生成创建表的DDL语句"""
        ddl_parts = []
        
        # CREATE TABLE语句开始
        ddl_parts.append(f"CREATE TABLE `{database}`.`{table_name}` (")
        
        # 列定义
        column_definitions = []
        for col in schema.columns:
            col_def = f"    `{col['name']}` {col['type']}"
            if 'default' in col:
                col_def += f" DEFAULT {col['default']}"
            if 'comment' in col:
                col_def += f" COMMENT '{col['comment']}'"
            column_definitions.append(col_def)
        
        ddl_parts.append(",\n".join(column_definitions))
        ddl_parts.append(")")
        
        # ENGINE子句
        engine_clause = f"ENGINE = {schema.engine.value}"
        
        # 特殊引擎参数
        if schema.engine == MergeTreeEngine.REPLACING_MERGE_TREE:
            # 可以指定版本列
            version_col = next((col['name'] for col in schema.columns if col.get('is_version', False)), None)
            if version_col:
                engine_clause += f"({version_col})"
        elif schema.engine == MergeTreeEngine.SUMMING_MERGE_TREE:
            # 可以指定求和列
            sum_cols = [col['name'] for col in schema.columns if col.get('is_sum', False)]
            if sum_cols:
                engine_clause += f"({', '.join(sum_cols)})"
        elif schema.engine == MergeTreeEngine.COLLAPSING_MERGE_TREE:
            # 需要指定Sign列
            sign_col = next((col['name'] for col in schema.columns if col.get('is_sign', False)), 'Sign')
            engine_clause += f"({sign_col})"
        elif schema.engine == MergeTreeEngine.VERSIONED_COLLAPSING_MERGE_TREE:
            # 需要指定Sign和Version列
            sign_col = next((col['name'] for col in schema.columns if col.get('is_sign', False)), 'Sign')
            version_col = next((col['name'] for col in schema.columns if col.get('is_version', False)), 'Version')
            engine_clause += f"({sign_col}, {version_col})"
        
        ddl_parts.append(engine_clause)
        
        # PARTITION BY子句
        if schema.partition_by:
            ddl_parts.append(f"PARTITION BY {schema.partition_by}")
        
        # ORDER BY子句(必需)
        if schema.order_by:
            ddl_parts.append(f"ORDER BY ({', '.join(schema.order_by)})")
        
        # PRIMARY KEY子句(可选)
        if schema.primary_key and schema.primary_key != schema.order_by:
            ddl_parts.append(f"PRIMARY KEY ({', '.join(schema.primary_key)})")
        
        # SAMPLE BY子句(可选)
        if schema.sample_by:
            ddl_parts.append(f"SAMPLE BY {schema.sample_by}")
        
        # TTL子句(可选)
        if schema.ttl:
            ddl_parts.append(f"TTL {schema.ttl}")
        
        # SETTINGS子句
        settings_list = []
        if schema.settings.index_granularity != 8192:
            settings_list.append(f"index_granularity = {schema.settings.index_granularity}")
        if schema.settings.storage_policy != "default":
            settings_list.append(f"storage_policy = '{schema.settings.storage_policy}'")
        
        if settings_list:
            ddl_parts.append(f"SETTINGS {', '.join(settings_list)}")
        
        return "\n".join(ddl_parts) + ";"
    
    def recommend_engine(self, use_case: str, data_characteristics: Dict[str, Any]) -> Dict[str, Any]:
        """根据使用场景推荐表引擎"""
        recommendations = []
        
        # 分析数据特征
        has_duplicates = data_characteristics.get('has_duplicates', False)
        needs_aggregation = data_characteristics.get('needs_aggregation', False)
        has_updates = data_characteristics.get('has_updates', False)
        data_volume = data_characteristics.get('data_volume', 'medium')  # small, medium, large
        query_pattern = data_characteristics.get('query_pattern', 'analytical')  # analytical, transactional
        
        # 基于使用场景推荐
        if use_case.lower() in ['log_analysis', 'time_series', 'analytics']:
            if has_duplicates:
                recommendations.append({
                    'engine': MergeTreeEngine.REPLACING_MERGE_TREE,
                    'score': 90,
                    'reason': '日志数据通常有重复,ReplacingMergeTree可以自动去重'
                })
            else:
                recommendations.append({
                    'engine': MergeTreeEngine.MERGE_TREE,
                    'score': 95,
                    'reason': '标准MergeTree适合大多数分析场景'
                })
        
        elif use_case.lower() in ['metrics', 'counters', 'aggregation']:
            if needs_aggregation:
                recommendations.append({
                    'engine': MergeTreeEngine.SUMMING_MERGE_TREE,
                    'score': 95,
                    'reason': 'SummingMergeTree可以自动聚合数值指标'
                })
                recommendations.append({
                    'engine': MergeTreeEngine.AGGREGATING_MERGE_TREE,
                    'score': 85,
                    'reason': 'AggregatingMergeTree支持复杂聚合函数'
                })
            else:
                recommendations.append({
                    'engine': MergeTreeEngine.MERGE_TREE,
                    'score': 80,
                    'reason': '基础MergeTree适合简单指标存储'
                })
        
        elif use_case.lower() in ['state_tracking', 'changelog', 'events']:
            recommendations.append({
                'engine': MergeTreeEngine.COLLAPSING_MERGE_TREE,
                'score': 90,
                'reason': 'CollapsingMergeTree适合状态变更跟踪'
            })
            if has_updates:
                recommendations.append({
                    'engine': MergeTreeEngine.VERSIONED_COLLAPSING_MERGE_TREE,
                    'score': 85,
                    'reason': 'VersionedCollapsingMergeTree支持版本化更新'
                })
        
        elif use_case.lower() in ['dimension_table', 'reference_data']:
            recommendations.append({
                'engine': MergeTreeEngine.REPLACING_MERGE_TREE,
                'score': 95,
                'reason': 'ReplacingMergeTree适合维度表的数据更新'
            })
        
        # 根据数据量调整推荐
        if data_volume == 'large':
            for rec in recommendations:
                if rec['engine'] in [MergeTreeEngine.MERGE_TREE, MergeTreeEngine.REPLACING_MERGE_TREE]:
                    rec['score'] += 5
                    rec['reason'] += ',且对大数据量有良好支持'
        
        # 排序推荐结果
        recommendations.sort(key=lambda x: x['score'], reverse=True)
        
        return {
            'primary_recommendation': recommendations[0] if recommendations else None,
            'all_recommendations': recommendations,
            'analysis': {
                'use_case': use_case,
                'data_characteristics': data_characteristics,
                'recommendation_count': len(recommendations)
            }
        }
    
    def generate_partition_strategy(self, date_column: str, data_pattern: str) -> Dict[str, Any]:
        """生成分区策略建议"""
        strategies = []
        
        if data_pattern == 'daily_heavy':
            strategies.append({
                'strategy': PartitionStrategy.BY_YEAR_MONTH_DAY,
                'expression': f"toYYYYMMDD({date_column})",
                'score': 95,
                'pros': ['精确的日级分区', '便于删除历史数据', '查询性能好'],
                'cons': ['分区数量多', '小文件问题'],
                'suitable_for': '每日数据量大(>1GB)的场景'
            })
        
        elif data_pattern == 'monthly_moderate':
            strategies.append({
                'strategy': PartitionStrategy.BY_YEAR_MONTH,
                'expression': f"toYYYYMM({date_column})",
                'score': 90,
                'pros': ['平衡的分区大小', '管理简单', '查询效率高'],
                'cons': ['删除粒度较粗'],
                'suitable_for': '月度数据量适中(100MB-10GB)的场景'
            })
        
        elif data_pattern == 'hourly_intensive':
            strategies.append({
                'strategy': PartitionStrategy.BY_HOUR,
                'expression': f"toYYYYMMDDhh({date_column})",
                'score': 85,
                'pros': ['细粒度分区', '实时数据处理', '便于小时级分析'],
                'cons': ['分区数量极多', '管理复杂'],
                'suitable_for': '高频实时数据(每小时>100MB)的场景'
            })
        
        else:  # default
            strategies.append({
                'strategy': PartitionStrategy.BY_YEAR_MONTH,
                'expression': f"toYYYYMM({date_column})",
                'score': 80,
                'pros': ['通用性好', '管理简单'],
                'cons': ['可能不是最优'],
                'suitable_for': '通用场景'
            })
        
        # 添加无分区选项
        strategies.append({
            'strategy': PartitionStrategy.NO_PARTITION,
            'expression': '',
            'score': 60,
            'pros': ['简单', '无分区开销'],
            'cons': ['查询性能差', '无法删除历史数据'],
            'suitable_for': '小数据量(<1GB)或测试场景'
        })
        
        strategies.sort(key=lambda x: x['score'], reverse=True)
        
        return {
            'recommended_strategy': strategies[0],
            'all_strategies': strategies,
            'analysis': {
                'date_column': date_column,
                'data_pattern': data_pattern
            }
        }
    
    def analyze_table_performance(self, table_name: str, partition_info: List[PartitionInfo]) -> Dict[str, Any]:
        """分析表性能"""
        if not partition_info:
            return {'error': '没有分区信息'}
        
        total_rows = sum(p.rows for p in partition_info)
        total_bytes = sum(p.bytes_on_disk for p in partition_info)
        total_compressed = sum(p.data_compressed_bytes for p in partition_info)
        total_uncompressed = sum(p.data_uncompressed_bytes for p in partition_info)
        
        active_partitions = [p for p in partition_info if p.active]
        inactive_partitions = [p for p in partition_info if not p.active]
        
        # 计算压缩比
        compression_ratio = total_uncompressed / total_compressed if total_compressed > 0 else 0
        
        # 分析分区大小分布
        partition_sizes = [p.bytes_on_disk for p in active_partitions]
        avg_partition_size = sum(partition_sizes) / len(partition_sizes) if partition_sizes else 0
        max_partition_size = max(partition_sizes) if partition_sizes else 0
        min_partition_size = min(partition_sizes) if partition_sizes else 0
        
        # 性能评估
        performance_score = 100
        issues = []
        recommendations = []
        
        # 检查分区数量
        if len(active_partitions) > 1000:
            performance_score -= 20
            issues.append(f"分区数量过多({len(active_partitions)}),可能影响查询性能")
            recommendations.append("考虑调整分区策略,减少分区数量")
        
        # 检查分区大小不均
        if max_partition_size > avg_partition_size * 10:
            performance_score -= 15
            issues.append("分区大小不均匀,存在热点分区")
            recommendations.append("检查数据分布,考虑重新设计分区键")
        
        # 检查小分区
        small_partitions = [p for p in active_partitions if p.bytes_on_disk < 100 * 1024 * 1024]  # <100MB
        if len(small_partitions) > len(active_partitions) * 0.3:
            performance_score -= 10
            issues.append(f"小分区过多({len(small_partitions)}),可能影响查询效率")
            recommendations.append("考虑合并小分区或调整分区策略")
        
        # 检查压缩比
        if compression_ratio < 3:
            performance_score -= 10
            issues.append(f"压缩比较低({compression_ratio:.2f})")
            recommendations.append("考虑使用更高效的压缩算法")
        
        return {
            'table_name': table_name,
            'summary': {
                'total_partitions': len(partition_info),
                'active_partitions': len(active_partitions),
                'inactive_partitions': len(inactive_partitions),
                'total_rows': total_rows,
                'total_size_bytes': total_bytes,
                'compression_ratio': compression_ratio,
                'avg_partition_size_mb': avg_partition_size / (1024 * 1024),
                'max_partition_size_mb': max_partition_size / (1024 * 1024),
                'min_partition_size_mb': min_partition_size / (1024 * 1024)
            },
            'performance': {
                'score': max(0, performance_score),
                'issues': issues,
                'recommendations': recommendations
            },
            'partition_distribution': {
                'by_size': {
                    'small_partitions': len([p for p in active_partitions if p.bytes_on_disk < 100 * 1024 * 1024]),
                    'medium_partitions': len([p for p in active_partitions if 100 * 1024 * 1024 <= p.bytes_on_disk < 1024 * 1024 * 1024]),
                    'large_partitions': len([p for p in active_partitions if p.bytes_on_disk >= 1024 * 1024 * 1024])
                },
                'by_rows': {
                    'small_partitions': len([p for p in active_partitions if p.rows < 1000000]),
                    'medium_partitions': len([p for p in active_partitions if 1000000 <= p.rows < 10000000]),
                    'large_partitions': len([p for p in active_partitions if p.rows >= 10000000])
                }
            }
        }
    
    def generate_optimization_suggestions(self, table_name: str, engine: MergeTreeEngine, 
                                        performance_analysis: Dict[str, Any]) -> List[Dict[str, str]]:
        """生成优化建议"""
        suggestions = []
        
        performance_score = performance_analysis.get('performance', {}).get('score', 100)
        issues = performance_analysis.get('performance', {}).get('issues', [])
        
        # 基于性能评分的建议
        if performance_score < 70:
            suggestions.append({
                'category': 'critical',
                'title': '表性能严重问题',
                'description': '表存在严重性能问题,需要立即优化',
                'action': '检查分区策略、索引设计和查询模式'
            })
        
        # 基于引擎类型的建议
        if engine == MergeTreeEngine.REPLACING_MERGE_TREE:
            suggestions.append({
                'category': 'maintenance',
                'title': 'ReplacingMergeTree维护',
                'description': '定期执行OPTIMIZE操作以触发去重',
                'action': f'OPTIMIZE TABLE {table_name} FINAL'
            })
        
        elif engine == MergeTreeEngine.SUMMING_MERGE_TREE:
            suggestions.append({
                'category': 'maintenance',
                'title': 'SummingMergeTree维护',
                'description': '定期执行OPTIMIZE操作以触发聚合',
                'action': f'OPTIMIZE TABLE {table_name} FINAL'
            })
        
        # 基于分区分析的建议
        partition_summary = performance_analysis.get('summary', {})
        if partition_summary.get('total_partitions', 0) > 1000:
            suggestions.append({
                'category': 'optimization',
                'title': '分区数量优化',
                'description': '分区数量过多,考虑调整分区策略',
                'action': '重新设计分区键,减少分区粒度'
            })
        
        if partition_summary.get('compression_ratio', 0) < 3:
            suggestions.append({
                'category': 'optimization',
                'title': '压缩优化',
                'description': '压缩比较低,可以提高存储效率',
                'action': '考虑使用ZSTD或LZ4HC压缩算法'
            })
        
        # 基于具体问题的建议
        for issue in issues:
            if '小分区' in issue:
                suggestions.append({
                    'category': 'optimization',
                    'title': '小分区合并',
                    'description': issue,
                    'action': '调整分区策略或使用ALTER TABLE ATTACH PARTITION合并分区'
                })
            
            elif '分区大小不均' in issue:
                suggestions.append({
                    'category': 'optimization',
                    'title': '数据分布优化',
                    'description': issue,
                    'action': '分析数据分布模式,重新设计分区键'
                })
        
        return suggestions

# MergeTree引擎管理示例
print("\n=== ClickHouse MergeTree引擎管理 ===")

merge_tree_manager = ClickHouseMergeTreeManager()

print("\n1. 引擎特性对比:")
for engine, characteristics in list(merge_tree_manager.engine_characteristics.items())[:3]:
    print(f"\n   {engine.value}:")
    print(f"     描述: {characteristics['description']}")
    print(f"     适用场景: {', '.join(characteristics['use_cases'][:2])}")
    print(f"     主要优势: {', '.join(characteristics['advantages'][:2])}")
    print(f"     去重支持: {characteristics['deduplication']}")
    print(f"     聚合支持: {characteristics['aggregation']}")

print("\n2. 表结构定义:")
# 定义用户行为分析表
user_behavior_schema = TableSchema(
    columns=[
        {"name": "user_id", "type": "UInt64", "comment": "用户ID"},
        {"name": "event_time", "type": "DateTime", "comment": "事件时间"},
        {"name": "event_type", "type": "String", "comment": "事件类型"},
        {"name": "page_url", "type": "String", "comment": "页面URL"},
        {"name": "session_id", "type": "String", "comment": "会话ID"},
        {"name": "duration", "type": "UInt32", "comment": "停留时长(秒)"}
    ],
    engine=MergeTreeEngine.MERGE_TREE,
    partition_by="toYYYYMM(event_time)",
    order_by=["user_id", "event_time"],
    primary_key=["user_id"],
    sample_by="user_id"
)

ddl_sql = merge_tree_manager.create_table_ddl("analytics", "user_behavior", user_behavior_schema)
print(f"\n   用户行为表DDL:")
print(f"     {ddl_sql[:300]}...")

print("\n3. 引擎推荐:")
test_cases = [
    {
        'use_case': 'log_analysis',
        'characteristics': {
            'has_duplicates': True,
            'data_volume': 'large',
            'query_pattern': 'analytical'
        }
    },
    {
        'use_case': 'metrics',
        'characteristics': {
            'needs_aggregation': True,
            'data_volume': 'medium',
            'query_pattern': 'analytical'
        }
    },
    {
        'use_case': 'dimension_table',
        'characteristics': {
            'has_duplicates': True,
            'has_updates': True,
            'data_volume': 'small'
        }
    }
]

for i, test_case in enumerate(test_cases, 1):
    recommendation = merge_tree_manager.recommend_engine(
        test_case['use_case'], 
        test_case['characteristics']
    )
    
    print(f"\n   场景 {i} - {test_case['use_case']}:")
    if recommendation['primary_recommendation']:
        primary = recommendation['primary_recommendation']
        print(f"     推荐引擎: {primary['engine'].value}")
        print(f"     推荐评分: {primary['score']}/100")
        print(f"     推荐理由: {primary['reason']}")
    
    print(f"     备选方案: {len(recommendation['all_recommendations']) - 1} 个")

print("\n4. 分区策略建议:")
partition_scenarios = [
    {'date_column': 'created_at', 'pattern': 'daily_heavy'},
    {'date_column': 'event_time', 'pattern': 'monthly_moderate'},
    {'date_column': 'log_time', 'pattern': 'hourly_intensive'}
]

for i, scenario in enumerate(partition_scenarios, 1):
    strategy = merge_tree_manager.generate_partition_strategy(
        scenario['date_column'], 
        scenario['pattern']
    )
    
    print(f"\n   场景 {i} - {scenario['pattern']}:")
    recommended = strategy['recommended_strategy']
    print(f"     推荐策略: {recommended['strategy'].value}")
    print(f"     分区表达式: {recommended['expression']}")
    print(f"     评分: {recommended['score']}/100")
    print(f"     适用场景: {recommended['suitable_for']}")
    print(f"     主要优势: {', '.join(recommended['pros'][:2])}")

print("\n5. 模拟分区信息:")
# 创建模拟分区数据
sample_partitions = [
    PartitionInfo(
        partition_id="202401",
        name="202401_1_100_2",
        active=True,
        marks=1250,
        rows=10000000,
        bytes_on_disk=500 * 1024 * 1024,  # 500MB
        data_compressed_bytes=400 * 1024 * 1024,
        data_uncompressed_bytes=1200 * 1024 * 1024,
        marks_bytes=1024 * 1024,
        modification_time=datetime.now() - timedelta(days=1)
    ),
    PartitionInfo(
        partition_id="202402",
        name="202402_1_80_1",
        active=True,
        marks=1000,
        rows=8000000,
        bytes_on_disk=400 * 1024 * 1024,  # 400MB
        data_compressed_bytes=320 * 1024 * 1024,
        data_uncompressed_bytes=960 * 1024 * 1024,
        marks_bytes=800 * 1024,
        modification_time=datetime.now() - timedelta(hours=12)
    ),
    PartitionInfo(
        partition_id="202403",
        name="202403_1_50_1",
        active=True,
        marks=625,
        rows=5000000,
        bytes_on_disk=250 * 1024 * 1024,  # 250MB
        data_compressed_bytes=200 * 1024 * 1024,
        data_uncompressed_bytes=600 * 1024 * 1024,
        marks_bytes=500 * 1024,
        modification_time=datetime.now() - timedelta(hours=6)
    )
]

performance_analysis = merge_tree_manager.analyze_table_performance(
    "user_behavior", 
    sample_partitions
)

print(f"\n   表性能分析:")
summary = performance_analysis['summary']
print(f"     总分区数: {summary['total_partitions']}")
print(f"     活跃分区数: {summary['active_partitions']}")
print(f"     总行数: {summary['total_rows']:,}")
print(f"     总大小: {summary['total_size_bytes'] / (1024*1024*1024):.2f} GB")
print(f"     压缩比: {summary['compression_ratio']:.2f}")
print(f"     平均分区大小: {summary['avg_partition_size_mb']:.1f} MB")

performance = performance_analysis['performance']
print(f"\n   性能评估:")
print(f"     性能评分: {performance['score']}/100")
if performance['issues']:
    print(f"     发现问题: {len(performance['issues'])} 个")
    for issue in performance['issues'][:2]:
        print(f"       - {issue}")
if performance['recommendations']:
    print(f"     优化建议: {len(performance['recommendations'])} 个")
    for rec in performance['recommendations'][:2]:
        print(f"       - {rec}")

print("\n6. 优化建议生成:")
optimization_suggestions = merge_tree_manager.generate_optimization_suggestions(
    "user_behavior",
    MergeTreeEngine.MERGE_TREE,
    performance_analysis
)

for i, suggestion in enumerate(optimization_suggestions[:3], 1):
    print(f"\n   建议 {i} [{suggestion['category']}]:")
    print(f"     标题: {suggestion['title']}")
    print(f"     描述: {suggestion['description']}")
    print(f"     操作: {suggestion['action']}")

print("\n7. 不同引擎的DDL对比:")
# ReplacingMergeTree示例
replacing_schema = TableSchema(
    columns=[
        {"name": "id", "type": "UInt64"},
        {"name": "name", "type": "String"},
        {"name": "email", "type": "String"},
        {"name": "updated_at", "type": "DateTime", "is_version": True}
    ],
    engine=MergeTreeEngine.REPLACING_MERGE_TREE,
    order_by=["id"]
)

replacing_ddl = merge_tree_manager.create_table_ddl("test", "users", replacing_schema)
print(f"\n   ReplacingMergeTree:")
print(f"     {replacing_ddl[:200]}...")

# SummingMergeTree示例
summing_schema = TableSchema(
    columns=[
        {"name": "date", "type": "Date"},
        {"name": "user_id", "type": "UInt64"},
        {"name": "clicks", "type": "UInt64", "is_sum": True},
        {"name": "impressions", "type": "UInt64", "is_sum": True}
    ],
    engine=MergeTreeEngine.SUMMING_MERGE_TREE,
    partition_by="toYYYYMM(date)",
    order_by=["date", "user_id"]
)

summing_ddl = merge_tree_manager.create_table_ddl("analytics", "user_metrics", summing_schema)
print(f"\n   SummingMergeTree:")
print(f"     {summing_ddl[:200]}...")

Log系列引擎

Log系列引擎适用于小数据量的场景,不支持索引,但具有简单高效的特点。

class LogEngine(Enum):
    """Log引擎类型"""
    TINY_LOG = "TinyLog"
    STRIPE_LOG = "StripeLog"
    LOG = "Log"

class ClickHouseLogEngineManager:
    """ClickHouse Log引擎管理器"""
    
    def __init__(self):
        self.log_engine_characteristics = self._initialize_log_characteristics()
    
    def _initialize_log_characteristics(self) -> Dict[LogEngine, Dict[str, Any]]:
        """初始化Log引擎特性"""
        return {
            LogEngine.TINY_LOG: {
                "description": "最简单的表引擎,数据存储在单个文件中",
                "use_cases": ["临时数据", "小数据集", "测试环境"],
                "advantages": ["简单", "快速写入", "无锁"],
                "limitations": ["无索引", "无并发读写", "无分区"],
                "max_recommended_size": "1GB",
                "concurrent_readers": 1,
                "concurrent_writers": 1,
                "supports_alter": False
            },
            LogEngine.STRIPE_LOG: {
                "description": "数据按列存储在不同文件中,支持并发读取",
                "use_cases": ["中等数据集", "多读少写", "分析查询"],
                "advantages": ["并发读取", "列式存储", "压缩效果好"],
                "limitations": ["无索引", "写入需要锁", "无分区"],
                "max_recommended_size": "10GB",
                "concurrent_readers": "unlimited",
                "concurrent_writers": 1,
                "supports_alter": True
            },
            LogEngine.LOG: {
                "description": "支持并发读写的Log引擎",
                "use_cases": ["中等数据集", "并发访问", "ETL过程"],
                "advantages": ["并发读写", "列式存储", "支持ALTER"],
                "limitations": ["无索引", "无分区", "性能有限"],
                "max_recommended_size": "10GB",
                "concurrent_readers": "unlimited",
                "concurrent_writers": "unlimited",
                "supports_alter": True
            }
        }
    
    def recommend_log_engine(self, data_size_gb: float, concurrent_readers: int, 
                           concurrent_writers: int, needs_alter: bool) -> Dict[str, Any]:
        """推荐Log引擎"""
        recommendations = []
        
        # TinyLog评估
        if data_size_gb <= 1 and concurrent_readers <= 1 and concurrent_writers <= 1:
            score = 95 if not needs_alter else 85
            recommendations.append({
                'engine': LogEngine.TINY_LOG,
                'score': score,
                'reason': '数据量小且无并发需求,TinyLog最简单高效'
            })
        
        # StripeLog评估
        if data_size_gb <= 10 and concurrent_writers <= 1:
            score = 90 if concurrent_readers > 1 else 80
            if needs_alter:
                score += 5
            recommendations.append({
                'engine': LogEngine.STRIPE_LOG,
                'score': score,
                'reason': '支持并发读取和ALTER操作,适合多读少写场景'
            })
        
        # Log评估
        if data_size_gb <= 10:
            score = 85
            if concurrent_writers > 1:
                score += 10
            if needs_alter:
                score += 5
            recommendations.append({
                'engine': LogEngine.LOG,
                'score': score,
                'reason': '支持并发读写,功能最完整的Log引擎'
            })
        
        # 如果数据量太大,建议使用MergeTree
        if data_size_gb > 10:
            recommendations.append({
                'engine': 'MergeTree',
                'score': 95,
                'reason': '数据量较大,建议使用MergeTree系列引擎'
            })
        
        recommendations.sort(key=lambda x: x['score'], reverse=True)
        
        return {
            'primary_recommendation': recommendations[0] if recommendations else None,
            'all_recommendations': recommendations,
            'analysis': {
                'data_size_gb': data_size_gb,
                'concurrent_readers': concurrent_readers,
                'concurrent_writers': concurrent_writers,
                'needs_alter': needs_alter
            }
        }
    
    def create_log_table_ddl(self, database: str, table_name: str, 
                           columns: List[Dict[str, str]], engine: LogEngine) -> str:
        """创建Log表的DDL"""
        ddl_parts = []
        
        # CREATE TABLE语句
        ddl_parts.append(f"CREATE TABLE `{database}`.`{table_name}` (")
        
        # 列定义
        column_definitions = []
        for col in columns:
            col_def = f"    `{col['name']}` {col['type']}"
            if 'default' in col:
                col_def += f" DEFAULT {col['default']}"
            if 'comment' in col:
                col_def += f" COMMENT '{col['comment']}'"
            column_definitions.append(col_def)
        
        ddl_parts.append(",\n".join(column_definitions))
        ddl_parts.append(")")
        
        # ENGINE子句
        ddl_parts.append(f"ENGINE = {engine.value}")
        
        return "\n".join(ddl_parts) + ";"

# Log引擎管理示例
print("\n\n=== ClickHouse Log引擎管理 ===")

log_engine_manager = ClickHouseLogEngineManager()

print("\n1. Log引擎特性对比:")
for engine, characteristics in log_engine_manager.log_engine_characteristics.items():
    print(f"\n   {engine.value}:")
    print(f"     描述: {characteristics['description']}")
    print(f"     最大推荐大小: {characteristics['max_recommended_size']}")
    print(f"     并发读取: {characteristics['concurrent_readers']}")
    print(f"     并发写入: {characteristics['concurrent_writers']}")
    print(f"     支持ALTER: {characteristics['supports_alter']}")
    print(f"     适用场景: {', '.join(characteristics['use_cases'])}")

print("\n2. Log引擎推荐:")
log_test_cases = [
    {'size': 0.5, 'readers': 1, 'writers': 1, 'alter': False},
    {'size': 5, 'readers': 10, 'writers': 1, 'alter': True},
    {'size': 8, 'readers': 5, 'writers': 3, 'alter': True},
    {'size': 15, 'readers': 2, 'writers': 1, 'alter': False}
]

for i, case in enumerate(log_test_cases, 1):
    recommendation = log_engine_manager.recommend_log_engine(
        case['size'], case['readers'], case['writers'], case['alter']
    )
    
    print(f"\n   场景 {i} (数据量: {case['size']}GB, 读者: {case['readers']}, 写者: {case['writers']}):")
    if recommendation['primary_recommendation']:
        primary = recommendation['primary_recommendation']
        print(f"     推荐引擎: {primary['engine']}")
        print(f"     推荐评分: {primary['score']}/100")
        print(f"     推荐理由: {primary['reason']}")

print("\n3. Log表DDL示例:")
# 创建临时日志表
temp_log_columns = [
    {"name": "timestamp", "type": "DateTime", "comment": "时间戳"},
    {"name": "level", "type": "String", "comment": "日志级别"},
    {"name": "message", "type": "String", "comment": "日志消息"},
    {"name": "source", "type": "String", "comment": "日志来源"}
]

for engine in [LogEngine.TINY_LOG, LogEngine.STRIPE_LOG]:
    ddl = log_engine_manager.create_log_table_ddl(
        "logs", f"temp_{engine.value.lower()}", temp_log_columns, engine
    )
    print(f"\n   {engine.value}表:")
    print(f"     {ddl[:150]}...")

Integration引擎

Integration引擎用于与外部系统集成,实现数据的实时同步和访问。

class IntegrationEngine(Enum):
    """集成引擎类型"""
    KAFKA = "Kafka"
    RABBIT_MQ = "RabbitMQ"
    MYSQL = "MySQL"
    POSTGRESQL = "PostgreSQL"
    MONGODB = "MongoDB"
    HDFS = "HDFS"
    S3 = "S3"
    URL = "URL"
    FILE = "File"
    JDBC = "JDBC"

@dataclass
class IntegrationConfig:
    """集成配置"""
    engine: IntegrationEngine
    connection_params: Dict[str, Any]
    format: str = "JSONEachRow"
    settings: Dict[str, Any] = None
    
    def __post_init__(self):
        if self.settings is None:
            self.settings = {}

class ClickHouseIntegrationManager:
    """ClickHouse集成引擎管理器"""
    
    def __init__(self):
        self.integration_characteristics = self._initialize_integration_characteristics()
    
    def _initialize_integration_characteristics(self) -> Dict[IntegrationEngine, Dict[str, Any]]:
        """初始化集成引擎特性"""
        return {
            IntegrationEngine.KAFKA: {
                "description": "Apache Kafka消息队列集成",
                "use_cases": ["实时数据流", "事件驱动架构", "微服务通信"],
                "advantages": ["实时性", "高吞吐量", "容错性"],
                "limitations": ["需要Kafka集群", "配置复杂"],
                "data_direction": "read",
                "real_time": True,
                "scalability": "high"
            },
            IntegrationEngine.MYSQL: {
                "description": "MySQL数据库集成",
                "use_cases": ["数据迁移", "实时同步", "混合查询"],
                "advantages": ["广泛支持", "成熟稳定", "易于使用"],
                "limitations": ["性能限制", "网络延迟"],
                "data_direction": "read/write",
                "real_time": False,
                "scalability": "medium"
            },
            IntegrationEngine.S3: {
                "description": "Amazon S3对象存储集成",
                "use_cases": ["数据湖", "备份存储", "批量导入"],
                "advantages": ["无限容量", "成本低", "高可用"],
                "limitations": ["网络依赖", "延迟较高"],
                "data_direction": "read/write",
                "real_time": False,
                "scalability": "very_high"
            },
            IntegrationEngine.HDFS: {
                "description": "Hadoop分布式文件系统集成",
                "use_cases": ["大数据处理", "数据湖", "批量分析"],
                "advantages": ["大数据支持", "分布式", "容错性"],
                "limitations": ["复杂部署", "Java依赖"],
                "data_direction": "read/write",
                "real_time": False,
                "scalability": "very_high"
            },
            IntegrationEngine.URL: {
                "description": "HTTP/HTTPS URL数据源集成",
                "use_cases": ["API数据", "Web数据", "外部服务"],
                "advantages": ["简单易用", "广泛支持", "灵活性"],
                "limitations": ["网络依赖", "安全性"],
                "data_direction": "read",
                "real_time": False,
                "scalability": "low"
            }
        }
    
    def create_integration_table_ddl(self, database: str, table_name: str,
                                    columns: List[Dict[str, str]], 
                                    config: IntegrationConfig) -> str:
        """创建集成表DDL"""
        ddl_parts = []
        
        # CREATE TABLE语句
        ddl_parts.append(f"CREATE TABLE `{database}`.`{table_name}` (")
        
        # 列定义
        column_definitions = []
        for col in columns:
            col_def = f"    `{col['name']}` {col['type']}"
            if 'comment' in col:
                col_def += f" COMMENT '{col['comment']}'"
            column_definitions.append(col_def)
        
        ddl_parts.append(",\n".join(column_definitions))
        ddl_parts.append(")")
        
        # ENGINE子句
        engine_clause = f"ENGINE = {config.engine.value}("
        
        # 根据引擎类型构建参数
        if config.engine == IntegrationEngine.KAFKA:
            params = [
                f"'{config.connection_params.get('kafka_broker_list', 'localhost:9092')}'",
                f"'{config.connection_params.get('kafka_topic_list', 'topic')}'",
                f"'{config.connection_params.get('kafka_group_name', 'clickhouse')}'",
                f"'{config.format}'"
            ]
            engine_clause += ", ".join(params)
        
        elif config.engine == IntegrationEngine.MYSQL:
            params = [
                f"'{config.connection_params.get('host', 'localhost')}:{config.connection_params.get('port', 3306)}'",
                f"'{config.connection_params.get('database', 'test')}'",
                f"'{config.connection_params.get('table', table_name)}'",
                f"'{config.connection_params.get('user', 'root')}'",
                f"'{config.connection_params.get('password', '')}'"
            ]
            engine_clause += ", ".join(params)
        
        elif config.engine == IntegrationEngine.S3:
            params = [
                f"'{config.connection_params.get('url', 's3://bucket/path/')}'",
                f"'{config.connection_params.get('access_key_id', '')}'",
                f"'{config.connection_params.get('secret_access_key', '')}'",
                f"'{config.format}'"
            ]
            engine_clause += ", ".join(params)
        
        elif config.engine == IntegrationEngine.URL:
            params = [
                f"'{config.connection_params.get('url', 'http://example.com/data')}'",
                f"'{config.format}'"
            ]
            engine_clause += ", ".join(params)
        
        engine_clause += ")"
        ddl_parts.append(engine_clause)
        
        # SETTINGS子句
        if config.settings:
            settings_list = [f"{k} = {v}" for k, v in config.settings.items()]
            ddl_parts.append(f"SETTINGS {', '.join(settings_list)}")
        
        return "\n".join(ddl_parts) + ";"
    
    def recommend_integration_engine(self, use_case: str, requirements: Dict[str, Any]) -> Dict[str, Any]:
        """推荐集成引擎"""
        recommendations = []
        
        real_time_required = requirements.get('real_time', False)
        data_volume = requirements.get('data_volume', 'medium')  # small, medium, large
        data_source = requirements.get('data_source', 'unknown')
        budget = requirements.get('budget', 'medium')  # low, medium, high
        
        # 基于数据源推荐
        if 'kafka' in data_source.lower() or 'stream' in use_case.lower():
            score = 95 if real_time_required else 80
            recommendations.append({
                'engine': IntegrationEngine.KAFKA,
                'score': score,
                'reason': 'Kafka适合实时流数据处理'
            })
        
        if 'mysql' in data_source.lower() or 'database' in use_case.lower():
            score = 90 if not real_time_required else 70
            recommendations.append({
                'engine': IntegrationEngine.MYSQL,
                'score': score,
                'reason': 'MySQL适合关系型数据集成'
            })
        
        if 's3' in data_source.lower() or 'object' in use_case.lower():
            score = 85 if data_volume == 'large' else 75
            if budget == 'low':
                score += 10
            recommendations.append({
                'engine': IntegrationEngine.S3,
                'score': score,
                'reason': 'S3适合大规模数据存储和成本优化'
            })
        
        if 'hadoop' in data_source.lower() or 'hdfs' in data_source.lower():
            score = 90 if data_volume == 'large' else 70
            recommendations.append({
                'engine': IntegrationEngine.HDFS,
                'score': score,
                'reason': 'HDFS适合大数据生态系统集成'
            })
        
        if 'api' in use_case.lower() or 'http' in data_source.lower():
            score = 80 if data_volume == 'small' else 60
            recommendations.append({
                'engine': IntegrationEngine.URL,
                'score': score,
                'reason': 'URL引擎适合API数据访问'
            })
        
        # 通用推荐
        if not recommendations:
            if real_time_required:
                recommendations.append({
                    'engine': IntegrationEngine.KAFKA,
                    'score': 75,
                    'reason': '实时需求推荐使用Kafka'
                })
            else:
                recommendations.append({
                    'engine': IntegrationEngine.S3,
                    'score': 70,
                    'reason': '通用场景推荐使用S3'
                })
        
        recommendations.sort(key=lambda x: x['score'], reverse=True)
        
        return {
            'primary_recommendation': recommendations[0] if recommendations else None,
            'all_recommendations': recommendations,
            'analysis': {
                'use_case': use_case,
                'requirements': requirements
            }
        }
    
    def generate_integration_examples(self) -> Dict[str, str]:
        """生成集成示例"""
        examples = {}
        
        # Kafka集成示例
        examples['kafka_real_time'] = """
-- Kafka实时数据流表
CREATE TABLE analytics.user_events (
    user_id UInt64,
    event_type String,
    timestamp DateTime,
    properties String
) ENGINE = Kafka(
    'localhost:9092',
    'user_events',
    'clickhouse_group',
    'JSONEachRow'
) SETTINGS
    kafka_thread_per_consumer = 1,
    kafka_num_consumers = 4;

-- 创建物化视图将数据写入MergeTree表
CREATE MATERIALIZED VIEW analytics.user_events_mv TO analytics.user_events_storage AS
SELECT *
FROM analytics.user_events;"""
        
        # MySQL集成示例
        examples['mysql_sync'] = """
-- MySQL数据同步表
CREATE TABLE analytics.mysql_users (
    id UInt64,
    username String,
    email String,
    created_at DateTime
) ENGINE = MySQL(
    'mysql-server:3306',
    'production',
    'users',
    'readonly_user',
    'password123'
);

-- 查询MySQL数据
SELECT count(*) as total_users,
       toYYYYMM(created_at) as month
FROM analytics.mysql_users
GROUP BY month
ORDER BY month;"""
        
        # S3集成示例
        examples['s3_data_lake'] = """
-- S3数据湖表
CREATE TABLE analytics.s3_logs (
    timestamp DateTime,
    level String,
    message String,
    source String
) ENGINE = S3(
    'https://my-bucket.s3.amazonaws.com/logs/*.parquet',
    'AKIAIOSFODNN7EXAMPLE',
    'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
    'Parquet'
) SETTINGS
    s3_max_single_part_upload_size = 32000000,
    s3_min_upload_part_size = 16000000;"""
        
        # URL集成示例
        examples['url_api'] = """
-- URL API数据表
CREATE TABLE analytics.api_data (
    id UInt64,
    name String,
    value Float64,
    timestamp DateTime
) ENGINE = URL(
    'https://api.example.com/data.json',
    'JSONEachRow'
);

-- 查询API数据
SELECT name, avg(value) as avg_value
FROM analytics.api_data
WHERE timestamp >= now() - INTERVAL 1 DAY
GROUP BY name;"""
        
        return examples

# Integration引擎管理示例
print("\n\n=== ClickHouse Integration引擎管理 ===")

integration_manager = ClickHouseIntegrationManager()

print("\n1. Integration引擎特性:")
for engine, characteristics in list(integration_manager.integration_characteristics.items())[:3]:
    print(f"\n   {engine.value}:")
    print(f"     描述: {characteristics['description']}")
    print(f"     数据方向: {characteristics['data_direction']}")
    print(f"     实时性: {characteristics['real_time']}")
    print(f"     可扩展性: {characteristics['scalability']}")
    print(f"     适用场景: {', '.join(characteristics['use_cases'][:2])}")

print("\n2. Integration引擎推荐:")
integration_test_cases = [
    {
        'use_case': 'real_time_analytics',
        'requirements': {
            'real_time': True,
            'data_volume': 'large',
            'data_source': 'kafka_stream'
        }
    },
    {
        'use_case': 'data_migration',
        'requirements': {
            'real_time': False,
            'data_volume': 'medium',
            'data_source': 'mysql_database'
        }
    },
    {
        'use_case': 'data_lake_access',
        'requirements': {
            'real_time': False,
            'data_volume': 'large',
            'data_source': 's3_storage',
            'budget': 'low'
        }
    }
]

for i, case in enumerate(integration_test_cases, 1):
    recommendation = integration_manager.recommend_integration_engine(
        case['use_case'], 
        case['requirements']
    )
    
    print(f"\n   场景 {i} - {case['use_case']}:")
    if recommendation['primary_recommendation']:
        primary = recommendation['primary_recommendation']
        print(f"     推荐引擎: {primary['engine'].value}")
        print(f"     推荐评分: {primary['score']}/100")
        print(f"     推荐理由: {primary['reason']}")

print("\n3. Integration表DDL示例:")
# Kafka配置示例
kafka_config = IntegrationConfig(
    engine=IntegrationEngine.KAFKA,
    connection_params={
        'kafka_broker_list': 'localhost:9092',
        'kafka_topic_list': 'user_events',
        'kafka_group_name': 'clickhouse_analytics'
    },
    format='JSONEachRow',
    settings={'kafka_thread_per_consumer': 1}
)

event_columns = [
    {'name': 'user_id', 'type': 'UInt64'},
    {'name': 'event_type', 'type': 'String'},
    {'name': 'timestamp', 'type': 'DateTime'},
    {'name': 'properties', 'type': 'String'}
]

kafka_ddl = integration_manager.create_integration_table_ddl(
    'analytics', 'kafka_events', event_columns, kafka_config
)
print(f"\n   Kafka表:")
print(f"     {kafka_ddl[:200]}...")

# MySQL配置示例
mysql_config = IntegrationConfig(
    engine=IntegrationEngine.MYSQL,
    connection_params={
        'host': 'mysql-server',
        'port': 3306,
        'database': 'production',
        'table': 'users',
        'user': 'readonly',
        'password': 'secret'
    }
)

user_columns = [
    {'name': 'id', 'type': 'UInt64'},
    {'name': 'username', 'type': 'String'},
    {'name': 'email', 'type': 'String'},
    {'name': 'created_at', 'type': 'DateTime'}
]

mysql_ddl = integration_manager.create_integration_table_ddl(
    'analytics', 'mysql_users', user_columns, mysql_config
)
print(f"\n   MySQL表:")
print(f"     {mysql_ddl[:200]}...")

print("\n4. Integration使用示例:")
examples = integration_manager.generate_integration_examples()
for example_name, example_sql in list(examples.items())[:2]:
    print(f"\n   {example_name.replace('_', ' ').title()}:")
    print(f"     {example_sql[:250]}...")

Special引擎

Special引擎提供特殊功能,如内存存储、分布式查询等。

class SpecialEngine(Enum):
    """特殊引擎类型"""
    MEMORY = "Memory"
    DISTRIBUTED = "Distributed"
    DICTIONARY = "Dictionary"
    MERGE = "Merge"
    BUFFER = "Buffer"
    NULL = "Null"
    SET = "Set"
    JOIN = "Join"
    VIEW = "View"
    MATERIALIZED_VIEW = "MaterializedView"

class ClickHouseSpecialEngineManager:
    """ClickHouse特殊引擎管理器"""
    
    def __init__(self):
        self.special_engine_characteristics = self._initialize_special_characteristics()
    
    def _initialize_special_characteristics(self) -> Dict[SpecialEngine, Dict[str, Any]]:
        """初始化特殊引擎特性"""
        return {
            SpecialEngine.MEMORY: {
                "description": "数据完全存储在内存中的表引擎",
                "use_cases": ["临时计算", "缓存", "小数据集"],
                "advantages": ["极快访问", "简单", "无磁盘IO"],
                "limitations": ["数据易失", "内存限制", "重启丢失"],
                "persistence": False,
                "performance": "very_high",
                "memory_usage": "high"
            },
            SpecialEngine.DISTRIBUTED: {
                "description": "分布式表引擎,将查询分发到多个节点",
                "use_cases": ["集群查询", "数据分片", "负载均衡"],
                "advantages": ["水平扩展", "并行处理", "高可用"],
                "limitations": ["网络开销", "配置复杂", "一致性问题"],
                "persistence": True,
                "performance": "high",
                "memory_usage": "low"
            },
            SpecialEngine.BUFFER: {
                "description": "缓冲表引擎,批量写入目标表",
                "use_cases": ["写入优化", "批量处理", "减少IO"],
                "advantages": ["提高写入性能", "减少小批量写入", "自动刷新"],
                "limitations": ["数据延迟", "内存占用", "可能丢失数据"],
                "persistence": False,
                "performance": "high",
                "memory_usage": "medium"
            },
            SpecialEngine.MERGE: {
                "description": "合并多个表的查询结果",
                "use_cases": ["表合并", "数据联合", "分区查询"],
                "advantages": ["逻辑合并", "简化查询", "透明访问"],
                "limitations": ["性能开销", "复杂性", "维护困难"],
                "persistence": True,
                "performance": "medium",
                "memory_usage": "low"
            },
            SpecialEngine.NULL: {
                "description": "空表引擎,丢弃所有写入数据",
                "use_cases": ["性能测试", "数据丢弃", "调试"],
                "advantages": ["零开销", "测试友好", "简单"],
                "limitations": ["数据丢失", "无查询", "功能有限"],
                "persistence": False,
                "performance": "very_high",
                "memory_usage": "very_low"
            }
        }
    
    def create_memory_table_ddl(self, database: str, table_name: str,
                               columns: List[Dict[str, str]]) -> str:
        """创建Memory表DDL"""
        ddl_parts = []
        
        ddl_parts.append(f"CREATE TABLE `{database}`.`{table_name}` (")
        
        column_definitions = []
        for col in columns:
            col_def = f"    `{col['name']}` {col['type']}"
            if 'comment' in col:
                col_def += f" COMMENT '{col['comment']}'"
            column_definitions.append(col_def)
        
        ddl_parts.append(",\n".join(column_definitions))
        ddl_parts.append(")")
        ddl_parts.append("ENGINE = Memory")
        
        return "\n".join(ddl_parts) + ";"
    
    def create_distributed_table_ddl(self, database: str, table_name: str,
                                    columns: List[Dict[str, str]],
                                    cluster_name: str, remote_database: str,
                                    remote_table: str, sharding_key: str = "") -> str:
        """创建Distributed表DDL"""
        ddl_parts = []
        
        ddl_parts.append(f"CREATE TABLE `{database}`.`{table_name}` (")
        
        column_definitions = []
        for col in columns:
            col_def = f"    `{col['name']}` {col['type']}"
            if 'comment' in col:
                col_def += f" COMMENT '{col['comment']}'"
            column_definitions.append(col_def)
        
        ddl_parts.append(",\n".join(column_definitions))
        ddl_parts.append(")")
        
        # Distributed引擎参数
        engine_params = [
            f"'{cluster_name}'",
            f"'{remote_database}'",
            f"'{remote_table}'"
        ]
        
        if sharding_key:
            engine_params.append(sharding_key)
        
        ddl_parts.append(f"ENGINE = Distributed({', '.join(engine_params)})")
        
        return "\n".join(ddl_parts) + ";"
    
    def create_buffer_table_ddl(self, database: str, table_name: str,
                               columns: List[Dict[str, str]],
                               target_database: str, target_table: str,
                               buffer_settings: Dict[str, int] = None) -> str:
        """创建Buffer表DDL"""
        if buffer_settings is None:
            buffer_settings = {
                'num_layers': 16,
                'min_time': 10,
                'max_time': 100,
                'min_rows': 10000,
                'max_rows': 1000000,
                'min_bytes': 10000000,
                'max_bytes': 100000000
            }
        
        ddl_parts = []
        
        ddl_parts.append(f"CREATE TABLE `{database}`.`{table_name}` (")
        
        column_definitions = []
        for col in columns:
            col_def = f"    `{col['name']}` {col['type']}"
            if 'comment' in col:
                col_def += f" COMMENT '{col['comment']}'"
            column_definitions.append(col_def)
        
        ddl_parts.append(",\n".join(column_definitions))
        ddl_parts.append(")")
        
        # Buffer引擎参数
        buffer_params = [
            f"'{target_database}'",
            f"'{target_table}'",
            str(buffer_settings['num_layers']),
            str(buffer_settings['min_time']),
            str(buffer_settings['max_time']),
            str(buffer_settings['min_rows']),
            str(buffer_settings['max_rows']),
            str(buffer_settings['min_bytes']),
            str(buffer_settings['max_bytes'])
        ]
        
        ddl_parts.append(f"ENGINE = Buffer({', '.join(buffer_params)})")
        
        return "\n".join(ddl_parts) + ";"
    
    def recommend_special_engine(self, use_case: str, requirements: Dict[str, Any]) -> Dict[str, Any]:
        """推荐特殊引擎"""
        recommendations = []
        
        data_size = requirements.get('data_size', 'medium')  # small, medium, large
        performance_priority = requirements.get('performance_priority', 'medium')  # low, medium, high
        persistence_required = requirements.get('persistence_required', True)
        memory_available = requirements.get('memory_available', 'medium')  # low, medium, high
        
        # Memory引擎推荐
        if (data_size == 'small' and performance_priority == 'high' and 
            not persistence_required and memory_available in ['medium', 'high']):
            recommendations.append({
                'engine': SpecialEngine.MEMORY,
                'score': 95,
                'reason': '小数据量且需要极高性能,Memory引擎最适合'
            })
        
        # Distributed引擎推荐
        if data_size == 'large' or 'cluster' in use_case.lower():
            recommendations.append({
                'engine': SpecialEngine.DISTRIBUTED,
                'score': 90,
                'reason': '大数据量或集群环境,Distributed引擎提供水平扩展'
            })
        
        # Buffer引擎推荐
        if 'write_intensive' in use_case.lower() or performance_priority == 'high':
            recommendations.append({
                'engine': SpecialEngine.BUFFER,
                'score': 85,
                'reason': '写入密集型场景,Buffer引擎可以优化写入性能'
            })
        
        # Null引擎推荐
        if 'test' in use_case.lower() or 'benchmark' in use_case.lower():
            recommendations.append({
                'engine': SpecialEngine.NULL,
                'score': 90,
                'reason': '测试或基准测试场景,Null引擎提供零开销'
            })
        
        recommendations.sort(key=lambda x: x['score'], reverse=True)
        
        return {
            'primary_recommendation': recommendations[0] if recommendations else None,
            'all_recommendations': recommendations,
            'analysis': {
                'use_case': use_case,
                'requirements': requirements
            }
        }
    
    def generate_special_engine_examples(self) -> Dict[str, str]:
        """生成特殊引擎示例"""
        examples = {}
        
        examples['memory_cache'] = """
-- Memory表作为缓存
CREATE TABLE cache.user_sessions (
    session_id String,
    user_id UInt64,
    start_time DateTime,
    last_activity DateTime,
    page_views UInt32
) ENGINE = Memory;

-- 插入会话数据
INSERT INTO cache.user_sessions VALUES
('sess_123', 1001, now() - INTERVAL 1 HOUR, now(), 5),
('sess_124', 1002, now() - INTERVAL 30 MINUTE, now(), 3);

-- 查询活跃会话
SELECT user_id, count(*) as active_sessions
FROM cache.user_sessions
WHERE last_activity >= now() - INTERVAL 15 MINUTE
GROUP BY user_id;"""
        
        examples['distributed_cluster'] = """
-- 分布式表
CREATE TABLE analytics.events_distributed (
    event_id UInt64,
    user_id UInt64,
    event_time DateTime,
    event_type String
) ENGINE = Distributed(
    'analytics_cluster',
    'analytics',
    'events_local',
    rand()
);

-- 查询分布式数据
SELECT event_type, count(*) as event_count
FROM analytics.events_distributed
WHERE event_time >= today()
GROUP BY event_type
ORDER BY event_count DESC;"""
        
        examples['buffer_optimization'] = """
-- Buffer表优化写入
CREATE TABLE analytics.events_buffer (
    event_id UInt64,
    user_id UInt64,
    event_time DateTime,
    event_type String
) ENGINE = Buffer(
    'analytics',
    'events',
    16,     -- num_layers
    10,     -- min_time
    100,    -- max_time
    10000,  -- min_rows
    1000000,-- max_rows
    10000000,   -- min_bytes
    100000000   -- max_bytes
);

-- 高频写入到Buffer表
INSERT INTO analytics.events_buffer VALUES
(1, 1001, now(), 'click'),
(2, 1002, now(), 'view');"""
        
        return examples

# Special引擎管理示例
print("\n\n=== ClickHouse Special引擎管理 ===")

special_engine_manager = ClickHouseSpecialEngineManager()

print("\n1. Special引擎特性:")
for engine, characteristics in list(special_engine_manager.special_engine_characteristics.items())[:3]:
    print(f"\n   {engine.value}:")
    print(f"     描述: {characteristics['description']}")
    print(f"     持久化: {characteristics['persistence']}")
    print(f"     性能: {characteristics['performance']}")
    print(f"     内存使用: {characteristics['memory_usage']}")
    print(f"     适用场景: {', '.join(characteristics['use_cases'])}")

print("\n2. Special引擎推荐:")
special_test_cases = [
    {
        'use_case': 'high_performance_cache',
        'requirements': {
            'data_size': 'small',
            'performance_priority': 'high',
            'persistence_required': False,
            'memory_available': 'high'
        }
    },
    {
        'use_case': 'cluster_analytics',
        'requirements': {
            'data_size': 'large',
            'performance_priority': 'medium',
            'persistence_required': True
        }
    },
    {
        'use_case': 'write_intensive_logging',
        'requirements': {
            'data_size': 'medium',
            'performance_priority': 'high',
            'persistence_required': True
        }
    }
]

for i, case in enumerate(special_test_cases, 1):
    recommendation = special_engine_manager.recommend_special_engine(
        case['use_case'], 
        case['requirements']
    )
    
    print(f"\n   场景 {i} - {case['use_case']}:")
    if recommendation['primary_recommendation']:
        primary = recommendation['primary_recommendation']
        print(f"     推荐引擎: {primary['engine'].value}")
        print(f"     推荐评分: {primary['score']}/100")
        print(f"     推荐理由: {primary['reason']}")

print("\n3. Special引擎DDL示例:")
# Memory表示例
cache_columns = [
    {'name': 'key', 'type': 'String'},
    {'name': 'value', 'type': 'String'},
    {'name': 'expire_time', 'type': 'DateTime'}
]

memory_ddl = special_engine_manager.create_memory_table_ddl(
    'cache', 'temp_data', cache_columns
)
print(f"\n   Memory表:")
print(f"     {memory_ddl}")

# Distributed表示例
event_columns = [
    {'name': 'event_id', 'type': 'UInt64'},
    {'name': 'user_id', 'type': 'UInt64'},
    {'name': 'event_time', 'type': 'DateTime'},
    {'name': 'event_type', 'type': 'String'}
]

distributed_ddl = special_engine_manager.create_distributed_table_ddl(
    'analytics', 'events_distributed', event_columns,
    'analytics_cluster', 'analytics', 'events_local', 'rand()'
)
print(f"\n   Distributed表:")
print(f"     {distributed_ddl[:200]}...")

print("\n4. Special引擎使用示例:")
special_examples = special_engine_manager.generate_special_engine_examples()
for example_name, example_sql in list(special_examples.items())[:2]:
    print(f"\n   {example_name.replace('_', ' ').title()}:")
print(f"     {example_sql[:300]}...")

总结

关键要点

  1. 表引擎选择原则

    • MergeTree系列:适用于大部分OLAP场景,提供高性能查询和数据压缩
    • Log系列:适用于小数据量和临时数据,简单高效但功能有限
    • Integration引擎:用于外部系统集成,实现数据的实时同步和访问
    • Special引擎:提供特殊功能,如内存存储、分布式查询等
  2. MergeTree引擎特点

    • 支持主键索引和分区
    • 提供多种变体(Replacing、Summing、Aggregating等)
    • 优秀的压缩比和查询性能
    • 支持数据去重和预聚合
  3. 引擎性能对比

    • 查询性能:MergeTree > Log > Integration
    • 写入性能:Buffer > Memory > MergeTree > Log
    • 存储效率:MergeTree > Log > Memory
    • 功能完整性:MergeTree > Integration > Special > Log

最佳实践

  1. 引擎选择策略 “`python

    根据数据量选择引擎

    if data_size < 1_GB: recommended_engine = “TinyLog” # 小数据量 elif data_size < 100_GB: recommended_engine = “MergeTree” # 中等数据量 else: recommended_engine = “Distributed” # 大数据量

# 根据使用场景选择引擎 if use_case == “real_time_analytics”: recommended_engine = “Kafka + MaterializedView” elif use_case == “data_warehouse”: recommended_engine = “MergeTree” elif use_case == “temporary_cache”: recommended_engine = “Memory”


2. **分区策略设计**
   ```python
   # 时间分区策略
   partition_strategies = {
       "daily": "toYYYYMMDD(date_column)",
       "monthly": "toYYYYMM(date_column)",
       "yearly": "toYear(date_column)"
   }
   
   # 选择分区粒度
   if daily_data_size < 1_GB:
       partition_strategy = "monthly"
   elif daily_data_size < 10_GB:
       partition_strategy = "daily"
   else:
       partition_strategy = "hourly"
  1. 索引优化建议 “`python

    主键设计原则

    primary_key_guidelines = { “cardinality”: “选择基数适中的列作为主键”, “query_pattern”: “根据查询模式设计主键顺序”, “data_distribution”: “确保数据分布均匀” }

# 二级索引使用 secondary_indexes = { “minmax”: “适用于范围查询”, “set”: “适用于IN查询”, “bloom_filter”: “适用于等值查询” }


4. **压缩算法选择**
   ```python
   # 压缩算法对比
   compression_comparison = {
       "LZ4": {"speed": "fast", "ratio": "medium", "cpu": "low"},
       "ZSTD": {"speed": "medium", "ratio": "high", "cpu": "medium"},
       "LZ4HC": {"speed": "slow", "ratio": "high", "cpu": "high"}
   }
   
   # 选择建议
   if cpu_resources == "limited":
       recommended_compression = "LZ4"
   elif storage_cost == "high":
       recommended_compression = "ZSTD"
   else:
       recommended_compression = "LZ4HC"
  1. 集成引擎配置 “`python

    Kafka集成最佳实践

    kafka_best_practices = { “consumer_groups”: “使用专用消费者组”, “batch_size”: “根据数据量调整批次大小”, “materialized_view”: “使用物化视图持久化数据”, “error_handling”: “配置错误处理机制” }

# MySQL集成优化 mysql_optimization = { “connection_pool”: “使用连接池”, “read_only”: “使用只读用户”, “index_hints”: “提供索引提示”, “query_cache”: “启用查询缓存” }


### 下一步学习

1. **查询优化**
   - 学习ClickHouse查询执行计划
   - 掌握查询性能调优技巧
   - 了解并行查询和分布式查询

2. **集群部署**
   - 学习ClickHouse集群架构
   - 掌握分片和副本配置
   - 了解集群监控和维护

3. **数据建模**
   - 学习维度建模在ClickHouse中的应用
   - 掌握数据仓库设计模式
   - 了解实时数据处理架构

4. **运维管理**
   - 学习备份和恢复策略
   - 掌握性能监控和告警
   - 了解容量规划和扩容策略

通过本章的学习,你应该已经掌握了ClickHouse各种表引擎的特点和使用场景。表引擎的选择直接影响到系统的性能和功能,因此在实际项目中需要根据具体需求进行合理选择。下一章我们将深入学习ClickHouse的查询优化技巧。