2.1 数据库和表的概念

2.1.1 数据库管理

from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Optional, Any
import time
import random

class DatabaseEngine(Enum):
    """数据库引擎枚举"""
    ORDINARY = "Ordinary"  # 普通数据库
    MEMORY = "Memory"      # 内存数据库
    LAZY = "Lazy"          # 延迟加载数据库
    MYSQL = "MySQL"        # MySQL数据库引擎
    POSTGRESQL = "PostgreSQL"  # PostgreSQL数据库引擎

class TableStatus(Enum):
    """表状态枚举"""
    ACTIVE = "active"
    READONLY = "readonly"
    DROPPED = "dropped"
    DETACHED = "detached"

@dataclass
class DatabaseInfo:
    """数据库信息数据类"""
    name: str
    engine: DatabaseEngine
    data_path: str
    metadata_path: str
    tables_count: int = 0
    total_size_bytes: int = 0
    created_time: float = 0.0

@dataclass
class TableInfo:
    """表信息数据类"""
    database: str
    name: str
    engine: str
    status: TableStatus
    rows_count: int = 0
    size_bytes: int = 0
    columns_count: int = 0
    partitions_count: int = 0
    created_time: float = 0.0
    last_modified: float = 0.0

class ClickHouseDatabaseManager:
    """ClickHouse数据库管理器"""
    
    def __init__(self):
        self.databases: Dict[str, DatabaseInfo] = {}
        self.tables: Dict[str, List[TableInfo]] = {}
        self._initialize_system_databases()
    
    def _initialize_system_databases(self):
        """初始化系统数据库"""
        system_databases = [
            DatabaseInfo(
                name="system",
                engine=DatabaseEngine.ORDINARY,
                data_path="/var/lib/clickhouse/data/system/",
                metadata_path="/var/lib/clickhouse/metadata/system/",
                tables_count=50,
                total_size_bytes=1024*1024*10,  # 10MB
                created_time=time.time() - 86400
            ),
            DatabaseInfo(
                name="default",
                engine=DatabaseEngine.ORDINARY,
                data_path="/var/lib/clickhouse/data/default/",
                metadata_path="/var/lib/clickhouse/metadata/default/",
                tables_count=0,
                total_size_bytes=0,
                created_time=time.time() - 86400
            ),
            DatabaseInfo(
                name="information_schema",
                engine=DatabaseEngine.MEMORY,
                data_path="",
                metadata_path="",
                tables_count=10,
                total_size_bytes=1024*1024,  # 1MB
                created_time=time.time() - 86400
            )
        ]
        
        for db in system_databases:
            self.databases[db.name] = db
            self.tables[db.name] = []
    
    def create_database(self, name: str, engine: DatabaseEngine = DatabaseEngine.ORDINARY) -> bool:
        """创建数据库"""
        if name in self.databases:
            return False
        
        db_info = DatabaseInfo(
            name=name,
            engine=engine,
            data_path=f"/var/lib/clickhouse/data/{name}/",
            metadata_path=f"/var/lib/clickhouse/metadata/{name}/",
            created_time=time.time()
        )
        
        self.databases[name] = db_info
        self.tables[name] = []
        return True
    
    def drop_database(self, name: str) -> bool:
        """删除数据库"""
        if name not in self.databases or name in ["system", "information_schema"]:
            return False
        
        del self.databases[name]
        del self.tables[name]
        return True
    
    def list_databases(self) -> List[DatabaseInfo]:
        """列出所有数据库"""
        return list(self.databases.values())
    
    def get_database_info(self, name: str) -> Optional[DatabaseInfo]:
        """获取数据库信息"""
        return self.databases.get(name)
    
    def create_table(self, database: str, table_name: str, engine: str) -> bool:
        """创建表"""
        if database not in self.databases:
            return False
        
        table_info = TableInfo(
            database=database,
            name=table_name,
            engine=engine,
            status=TableStatus.ACTIVE,
            created_time=time.time(),
            last_modified=time.time()
        )
        
        self.tables[database].append(table_info)
        self.databases[database].tables_count += 1
        return True
    
    def drop_table(self, database: str, table_name: str) -> bool:
        """删除表"""
        if database not in self.tables:
            return False
        
        for i, table in enumerate(self.tables[database]):
            if table.name == table_name:
                self.tables[database][i].status = TableStatus.DROPPED
                self.databases[database].tables_count -= 1
                return True
        return False
    
    def list_tables(self, database: str) -> List[TableInfo]:
        """列出数据库中的所有表"""
        if database not in self.tables:
            return []
        return [t for t in self.tables[database] if t.status == TableStatus.ACTIVE]
    
    def get_table_info(self, database: str, table_name: str) -> Optional[TableInfo]:
        """获取表信息"""
        if database not in self.tables:
            return None
        
        for table in self.tables[database]:
            if table.name == table_name and table.status == TableStatus.ACTIVE:
                return table
        return None
    
    def get_database_statistics(self) -> Dict[str, Any]:
        """获取数据库统计信息"""
        total_databases = len(self.databases)
        total_tables = sum(len(self.list_tables(db)) for db in self.databases.keys())
        total_size = sum(db.total_size_bytes for db in self.databases.values())
        
        return {
            "total_databases": total_databases,
            "total_tables": total_tables,
            "total_size_mb": round(total_size / 1024 / 1024, 2),
            "databases_by_engine": self._get_databases_by_engine(),
            "largest_databases": self._get_largest_databases()
        }
    
    def _get_databases_by_engine(self) -> Dict[str, int]:
        """按引擎统计数据库数量"""
        engine_count = {}
        for db in self.databases.values():
            engine = db.engine.value
            engine_count[engine] = engine_count.get(engine, 0) + 1
        return engine_count
    
    def _get_largest_databases(self, limit: int = 5) -> List[Dict[str, Any]]:
        """获取最大的数据库"""
        sorted_dbs = sorted(self.databases.values(), key=lambda x: x.total_size_bytes, reverse=True)
        return [
            {
                "name": db.name,
                "size_mb": round(db.total_size_bytes / 1024 / 1024, 2),
                "tables_count": db.tables_count
            }
            for db in sorted_dbs[:limit]
        ]

# 数据库管理示例
print("=== ClickHouse数据库管理 ===")

db_manager = ClickHouseDatabaseManager()

print("\n1. 系统数据库列表:")
for db in db_manager.list_databases():
    print(f"   - {db.name} ({db.engine.value}): {db.tables_count}个表")

print("\n2. 创建业务数据库:")
business_dbs = ["analytics", "logs", "metrics"]
for db_name in business_dbs:
    success = db_manager.create_database(db_name)
    print(f"   创建数据库 {db_name}: {'成功' if success else '失败'}")

print("\n3. 创建示例表:")
tables_to_create = [
    ("analytics", "user_events", "MergeTree"),
    ("analytics", "page_views", "MergeTree"),
    ("logs", "access_logs", "MergeTree"),
    ("logs", "error_logs", "MergeTree"),
    ("metrics", "system_metrics", "SummingMergeTree")
]

for db, table, engine in tables_to_create:
    success = db_manager.create_table(db, table, engine)
    print(f"   创建表 {db}.{table} ({engine}): {'成功' if success else '失败'}")

print("\n4. 数据库统计信息:")
stats = db_manager.get_database_statistics()
print(f"   总数据库数: {stats['total_databases']}")
print(f"   总表数: {stats['total_tables']}")
print(f"   总大小: {stats['total_size_mb']} MB")

print("\n   按引擎分布:")
for engine, count in stats['databases_by_engine'].items():
    print(f"     {engine}: {count}个数据库")

print("\n5. 查看特定数据库的表:")
for db_name in ["analytics", "logs"]:
    tables = db_manager.list_tables(db_name)
    print(f"\n   数据库 {db_name} 的表:")
    for table in tables:
        print(f"     - {table.name} ({table.engine})")

2.1.2 表的生命周期管理

from enum import Enum
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any
import time
import json

class TableOperation(Enum):
    """表操作枚举"""
    CREATE = "create"
    ALTER = "alter"
    DROP = "drop"
    DETACH = "detach"
    ATTACH = "attach"
    RENAME = "rename"
    TRUNCATE = "truncate"

class PartitionStatus(Enum):
    """分区状态枚举"""
    ACTIVE = "active"
    INACTIVE = "inactive"
    READONLY = "readonly"
    OUTDATED = "outdated"

@dataclass
class TableOperation:
    """表操作记录数据类"""
    operation_type: TableOperation
    database: str
    table_name: str
    timestamp: float
    user: str
    details: Dict[str, Any] = field(default_factory=dict)
    success: bool = True
    error_message: Optional[str] = None

@dataclass
class PartitionInfo:
    """分区信息数据类"""
    partition_id: str
    table: str
    database: str
    status: PartitionStatus
    rows_count: int = 0
    size_bytes: int = 0
    min_date: Optional[str] = None
    max_date: Optional[str] = None
    created_time: float = 0.0
    last_modified: float = 0.0

class ClickHouseTableLifecycleManager:
    """ClickHouse表生命周期管理器"""
    
    def __init__(self):
        self.operations_log: List[TableOperation] = []
        self.partitions: Dict[str, List[PartitionInfo]] = {}  # table_key -> partitions
        self.table_metadata: Dict[str, Dict[str, Any]] = {}  # table_key -> metadata
    
    def _get_table_key(self, database: str, table_name: str) -> str:
        """获取表的唯一键"""
        return f"{database}.{table_name}"
    
    def create_table_with_lifecycle(self, database: str, table_name: str, 
                                   engine: str, columns: List[Dict[str, str]], 
                                   partition_by: Optional[str] = None,
                                   order_by: Optional[str] = None,
                                   ttl: Optional[str] = None) -> bool:
        """创建表并设置生命周期"""
        table_key = self._get_table_key(database, table_name)
        
        # 记录操作
        operation = TableOperation(
            operation_type=TableOperation.CREATE,
            database=database,
            table_name=table_name,
            timestamp=time.time(),
            user="admin",
            details={
                "engine": engine,
                "columns": columns,
                "partition_by": partition_by,
                "order_by": order_by,
                "ttl": ttl
            }
        )
        
        try:
            # 存储表元数据
            self.table_metadata[table_key] = {
                "database": database,
                "table_name": table_name,
                "engine": engine,
                "columns": columns,
                "partition_by": partition_by,
                "order_by": order_by,
                "ttl": ttl,
                "created_time": time.time(),
                "status": "active"
            }
            
            # 初始化分区列表
            self.partitions[table_key] = []
            
            operation.success = True
            self.operations_log.append(operation)
            return True
            
        except Exception as e:
            operation.success = False
            operation.error_message = str(e)
            self.operations_log.append(operation)
            return False
    
    def alter_table(self, database: str, table_name: str, 
                   alter_type: str, details: Dict[str, Any]) -> bool:
        """修改表结构"""
        table_key = self._get_table_key(database, table_name)
        
        if table_key not in self.table_metadata:
            return False
        
        operation = TableOperation(
            operation_type=TableOperation.ALTER,
            database=database,
            table_name=table_name,
            timestamp=time.time(),
            user="admin",
            details={"alter_type": alter_type, **details}
        )
        
        try:
            # 更新元数据
            if alter_type == "ADD_COLUMN":
                self.table_metadata[table_key]["columns"].append(details["column"])
            elif alter_type == "DROP_COLUMN":
                columns = self.table_metadata[table_key]["columns"]
                self.table_metadata[table_key]["columns"] = [
                    col for col in columns if col["name"] != details["column_name"]
                ]
            elif alter_type == "MODIFY_TTL":
                self.table_metadata[table_key]["ttl"] = details["ttl"]
            
            operation.success = True
            self.operations_log.append(operation)
            return True
            
        except Exception as e:
            operation.success = False
            operation.error_message = str(e)
            self.operations_log.append(operation)
            return False
    
    def drop_table(self, database: str, table_name: str) -> bool:
        """删除表"""
        table_key = self._get_table_key(database, table_name)
        
        operation = TableOperation(
            operation_type=TableOperation.DROP,
            database=database,
            table_name=table_name,
            timestamp=time.time(),
            user="admin"
        )
        
        try:
            if table_key in self.table_metadata:
                self.table_metadata[table_key]["status"] = "dropped"
                del self.partitions[table_key]
            
            operation.success = True
            self.operations_log.append(operation)
            return True
            
        except Exception as e:
            operation.success = False
            operation.error_message = str(e)
            self.operations_log.append(operation)
            return False
    
    def detach_table(self, database: str, table_name: str) -> bool:
        """分离表"""
        table_key = self._get_table_key(database, table_name)
        
        if table_key not in self.table_metadata:
            return False
        
        operation = TableOperation(
            operation_type=TableOperation.DETACH,
            database=database,
            table_name=table_name,
            timestamp=time.time(),
            user="admin"
        )
        
        self.table_metadata[table_key]["status"] = "detached"
        operation.success = True
        self.operations_log.append(operation)
        return True
    
    def attach_table(self, database: str, table_name: str) -> bool:
        """附加表"""
        table_key = self._get_table_key(database, table_name)
        
        if table_key not in self.table_metadata:
            return False
        
        operation = TableOperation(
            operation_type=TableOperation.ATTACH,
            database=database,
            table_name=table_name,
            timestamp=time.time(),
            user="admin"
        )
        
        self.table_metadata[table_key]["status"] = "active"
        operation.success = True
        self.operations_log.append(operation)
        return True
    
    def add_partition(self, database: str, table_name: str, 
                     partition_id: str, rows_count: int = 0, 
                     size_bytes: int = 0) -> bool:
        """添加分区"""
        table_key = self._get_table_key(database, table_name)
        
        if table_key not in self.partitions:
            return False
        
        partition = PartitionInfo(
            partition_id=partition_id,
            table=table_name,
            database=database,
            status=PartitionStatus.ACTIVE,
            rows_count=rows_count,
            size_bytes=size_bytes,
            created_time=time.time(),
            last_modified=time.time()
        )
        
        self.partitions[table_key].append(partition)
        return True
    
    def drop_partition(self, database: str, table_name: str, partition_id: str) -> bool:
        """删除分区"""
        table_key = self._get_table_key(database, table_name)
        
        if table_key not in self.partitions:
            return False
        
        for i, partition in enumerate(self.partitions[table_key]):
            if partition.partition_id == partition_id:
                self.partitions[table_key][i].status = PartitionStatus.INACTIVE
                return True
        return False
    
    def get_table_partitions(self, database: str, table_name: str) -> List[PartitionInfo]:
        """获取表的分区信息"""
        table_key = self._get_table_key(database, table_name)
        return self.partitions.get(table_key, [])
    
    def get_operations_history(self, database: str = None, 
                              table_name: str = None, 
                              limit: int = 100) -> List[TableOperation]:
        """获取操作历史"""
        operations = self.operations_log
        
        if database:
            operations = [op for op in operations if op.database == database]
        
        if table_name:
            operations = [op for op in operations if op.table_name == table_name]
        
        return sorted(operations, key=lambda x: x.timestamp, reverse=True)[:limit]
    
    def get_lifecycle_statistics(self) -> Dict[str, Any]:
        """获取生命周期统计信息"""
        total_operations = len(self.operations_log)
        successful_operations = len([op for op in self.operations_log if op.success])
        
        operations_by_type = {}
        for op in self.operations_log:
            op_type = op.operation_type.value
            operations_by_type[op_type] = operations_by_type.get(op_type, 0) + 1
        
        active_tables = len([t for t in self.table_metadata.values() if t["status"] == "active"])
        total_partitions = sum(len(partitions) for partitions in self.partitions.values())
        
        return {
            "total_operations": total_operations,
            "successful_operations": successful_operations,
            "success_rate": round(successful_operations / total_operations * 100, 2) if total_operations > 0 else 0,
            "operations_by_type": operations_by_type,
            "active_tables": active_tables,
            "total_partitions": total_partitions
        }

# 表生命周期管理示例
print("\n=== ClickHouse表生命周期管理 ===")

lifecycle_manager = ClickHouseTableLifecycleManager()

print("\n1. 创建表:")
tables_config = [
    {
        "database": "analytics",
        "table_name": "user_events",
        "engine": "MergeTree",
        "columns": [
            {"name": "user_id", "type": "UInt64"},
            {"name": "event_type", "type": "String"},
            {"name": "timestamp", "type": "DateTime"},
            {"name": "properties", "type": "String"}
        ],
        "partition_by": "toYYYYMM(timestamp)",
        "order_by": "(user_id, timestamp)",
        "ttl": "timestamp + INTERVAL 1 YEAR"
    },
    {
        "database": "logs",
        "table_name": "access_logs",
        "engine": "MergeTree",
        "columns": [
            {"name": "ip", "type": "String"},
            {"name": "method", "type": "String"},
            {"name": "url", "type": "String"},
            {"name": "status_code", "type": "UInt16"},
            {"name": "timestamp", "type": "DateTime"}
        ],
        "partition_by": "toYYYYMMDD(timestamp)",
        "order_by": "timestamp",
        "ttl": "timestamp + INTERVAL 3 MONTH"
    }
]

for config in tables_config:
    success = lifecycle_manager.create_table_with_lifecycle(**config)
    print(f"   创建表 {config['database']}.{config['table_name']}: {'成功' if success else '失败'}")

print("\n2. 添加分区:")
partitions_to_add = [
    ("analytics", "user_events", "202401", 1000000, 50*1024*1024),
    ("analytics", "user_events", "202402", 1200000, 60*1024*1024),
    ("logs", "access_logs", "20240101", 500000, 25*1024*1024),
    ("logs", "access_logs", "20240102", 600000, 30*1024*1024)
]

for db, table, partition_id, rows, size in partitions_to_add:
    success = lifecycle_manager.add_partition(db, table, partition_id, rows, size)
    print(f"   添加分区 {db}.{table}.{partition_id}: {'成功' if success else '失败'}")

print("\n3. 表结构修改:")
alter_operations = [
    ("analytics", "user_events", "ADD_COLUMN", {"column": {"name": "session_id", "type": "String"}}),
    ("logs", "access_logs", "MODIFY_TTL", {"ttl": "timestamp + INTERVAL 6 MONTH"})
]

for db, table, alter_type, details in alter_operations:
    success = lifecycle_manager.alter_table(db, table, alter_type, details)
    print(f"   修改表 {db}.{table} ({alter_type}): {'成功' if success else '失败'}")

print("\n4. 表操作:")
# 分离和重新附加表
success = lifecycle_manager.detach_table("logs", "access_logs")
print(f"   分离表 logs.access_logs: {'成功' if success else '失败'}")

success = lifecycle_manager.attach_table("logs", "access_logs")
print(f"   附加表 logs.access_logs: {'成功' if success else '失败'}")

print("\n5. 查看分区信息:")
for db, table in [("analytics", "user_events"), ("logs", "access_logs")]:
    partitions = lifecycle_manager.get_table_partitions(db, table)
    print(f"\n   表 {db}.{table} 的分区:")
    for partition in partitions:
        print(f"     - {partition.partition_id}: {partition.rows_count:,}行, {partition.size_bytes//1024//1024}MB")

print("\n6. 操作历史:")
history = lifecycle_manager.get_operations_history(limit=10)
print(f"   最近{len(history)}个操作:")
for op in history:
    status = "成功" if op.success else "失败"
    print(f"     - {op.operation_type.value} {op.database}.{op.table_name}: {status}")

print("\n7. 生命周期统计:")
stats = lifecycle_manager.get_lifecycle_statistics()
print(f"   总操作数: {stats['total_operations']}")
print(f"   成功率: {stats['success_rate']}%")
print(f"   活跃表数: {stats['active_tables']}")
print(f"   总分区数: {stats['total_partitions']}")

print("\n   按操作类型分布:")
for op_type, count in stats['operations_by_type'].items():
    print(f"     {op_type}: {count}次")

2.2 数据类型

2.2.1 基础数据类型

from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Union
import struct
import json
import datetime

class DataTypeCategory(Enum):
    """数据类型分类枚举"""
    INTEGER = "integer"
    FLOAT = "float"
    STRING = "string"
    DATE_TIME = "datetime"
    BOOLEAN = "boolean"
    ARRAY = "array"
    TUPLE = "tuple"
    NESTED = "nested"
    SPECIAL = "special"

@dataclass
class DataTypeInfo:
    """数据类型信息数据类"""
    name: str
    category: DataTypeCategory
    size_bytes: int  # -1表示可变长度
    min_value: Optional[Union[int, float, str]] = None
    max_value: Optional[Union[int, float, str]] = None
    description: str = ""
    examples: List[Any] = None
    aliases: List[str] = None

class ClickHouseDataTypeManager:
    """ClickHouse数据类型管理器"""
    
    def __init__(self):
        self.data_types = self._initialize_data_types()
    
    def _initialize_data_types(self) -> Dict[str, DataTypeInfo]:
        """初始化数据类型定义"""
        types = {}
        
        # 整数类型
        integer_types = [
            DataTypeInfo(
                name="UInt8",
                category=DataTypeCategory.INTEGER,
                size_bytes=1,
                min_value=0,
                max_value=255,
                description="8位无符号整数",
                examples=[0, 100, 255],
                aliases=["TINYINT UNSIGNED"]
            ),
            DataTypeInfo(
                name="UInt16",
                category=DataTypeCategory.INTEGER,
                size_bytes=2,
                min_value=0,
                max_value=65535,
                description="16位无符号整数",
                examples=[0, 1000, 65535]
            ),
            DataTypeInfo(
                name="UInt32",
                category=DataTypeCategory.INTEGER,
                size_bytes=4,
                min_value=0,
                max_value=4294967295,
                description="32位无符号整数",
                examples=[0, 1000000, 4294967295]
            ),
            DataTypeInfo(
                name="UInt64",
                category=DataTypeCategory.INTEGER,
                size_bytes=8,
                min_value=0,
                max_value=18446744073709551615,
                description="64位无符号整数",
                examples=[0, 1000000000, 18446744073709551615]
            ),
            DataTypeInfo(
                name="Int8",
                category=DataTypeCategory.INTEGER,
                size_bytes=1,
                min_value=-128,
                max_value=127,
                description="8位有符号整数",
                examples=[-128, 0, 127],
                aliases=["TINYINT"]
            ),
            DataTypeInfo(
                name="Int16",
                category=DataTypeCategory.INTEGER,
                size_bytes=2,
                min_value=-32768,
                max_value=32767,
                description="16位有符号整数",
                examples=[-32768, 0, 32767],
                aliases=["SMALLINT"]
            ),
            DataTypeInfo(
                name="Int32",
                category=DataTypeCategory.INTEGER,
                size_bytes=4,
                min_value=-2147483648,
                max_value=2147483647,
                description="32位有符号整数",
                examples=[-2147483648, 0, 2147483647],
                aliases=["INT"]
            ),
            DataTypeInfo(
                name="Int64",
                category=DataTypeCategory.INTEGER,
                size_bytes=8,
                min_value=-9223372036854775808,
                max_value=9223372036854775807,
                description="64位有符号整数",
                examples=[-9223372036854775808, 0, 9223372036854775807],
                aliases=["BIGINT"]
            )
        ]
        
        # 浮点类型
        float_types = [
            DataTypeInfo(
                name="Float32",
                category=DataTypeCategory.FLOAT,
                size_bytes=4,
                description="32位浮点数",
                examples=[3.14, -2.5, 1.23e-4],
                aliases=["FLOAT"]
            ),
            DataTypeInfo(
                name="Float64",
                category=DataTypeCategory.FLOAT,
                size_bytes=8,
                description="64位浮点数",
                examples=[3.141592653589793, -2.5, 1.23e-10],
                aliases=["DOUBLE"]
            )
        ]
        
        # 字符串类型
        string_types = [
            DataTypeInfo(
                name="String",
                category=DataTypeCategory.STRING,
                size_bytes=-1,
                description="可变长度字符串",
                examples=["hello", "world", "ClickHouse"],
                aliases=["TEXT", "VARCHAR"]
            ),
            DataTypeInfo(
                name="FixedString(N)",
                category=DataTypeCategory.STRING,
                size_bytes=-1,  # 取决于N
                description="固定长度字符串",
                examples=["hello", "world"],
                aliases=["CHAR(N)"]
            )
        ]
        
        # 日期时间类型
        datetime_types = [
            DataTypeInfo(
                name="Date",
                category=DataTypeCategory.DATE_TIME,
                size_bytes=2,
                min_value="1900-01-01",
                max_value="2299-12-31",
                description="日期类型,精度到天",
                examples=["2024-01-01", "2023-12-31"]
            ),
            DataTypeInfo(
                name="Date32",
                category=DataTypeCategory.DATE_TIME,
                size_bytes=4,
                min_value="1900-01-01",
                max_value="2299-12-31",
                description="扩展日期类型",
                examples=["2024-01-01", "1950-06-15"]
            ),
            DataTypeInfo(
                name="DateTime",
                category=DataTypeCategory.DATE_TIME,
                size_bytes=4,
                min_value="1900-01-01 00:00:00",
                max_value="2106-02-07 06:28:15",
                description="日期时间类型,精度到秒",
                examples=["2024-01-01 12:00:00", "2023-12-31 23:59:59"]
            ),
            DataTypeInfo(
                name="DateTime64",
                category=DataTypeCategory.DATE_TIME,
                size_bytes=8,
                description="高精度日期时间类型,支持亚秒精度",
                examples=["2024-01-01 12:00:00.123", "2023-12-31 23:59:59.999999"]
            )
        ]
        
        # 布尔类型
        boolean_types = [
            DataTypeInfo(
                name="Bool",
                category=DataTypeCategory.BOOLEAN,
                size_bytes=1,
                description="布尔类型",
                examples=[True, False, 1, 0]
            )
        ]
        
        # 数组类型
        array_types = [
            DataTypeInfo(
                name="Array(T)",
                category=DataTypeCategory.ARRAY,
                size_bytes=-1,
                description="数组类型,元素类型为T",
                examples=[[1, 2, 3], ["a", "b", "c"], []]
            )
        ]
        
        # 元组类型
        tuple_types = [
            DataTypeInfo(
                name="Tuple(T1, T2, ...)",
                category=DataTypeCategory.TUPLE,
                size_bytes=-1,
                description="元组类型,包含多个不同类型的元素",
                examples=[(1, "hello"), (3.14, "world", True)]
            )
        ]
        
        # 特殊类型
        special_types = [
            DataTypeInfo(
                name="UUID",
                category=DataTypeCategory.SPECIAL,
                size_bytes=16,
                description="UUID类型",
                examples=["550e8400-e29b-41d4-a716-446655440000"]
            ),
            DataTypeInfo(
                name="IPv4",
                category=DataTypeCategory.SPECIAL,
                size_bytes=4,
                description="IPv4地址类型",
                examples=["192.168.1.1", "10.0.0.1"]
            ),
            DataTypeInfo(
                name="IPv6",
                category=DataTypeCategory.SPECIAL,
                size_bytes=16,
                description="IPv6地址类型",
                examples=["2001:0db8:85a3:0000:0000:8a2e:0370:7334"]
            ),
            DataTypeInfo(
                name="Enum8",
                category=DataTypeCategory.SPECIAL,
                size_bytes=1,
                description="8位枚举类型",
                examples=["'red' = 1, 'green' = 2, 'blue' = 3"]
            ),
            DataTypeInfo(
                name="Enum16",
                category=DataTypeCategory.SPECIAL,
                size_bytes=2,
                description="16位枚举类型",
                examples=["'small' = 1, 'medium' = 2, 'large' = 3"]
            )
        ]
        
        # 合并所有类型
        all_types = (integer_types + float_types + string_types + 
                    datetime_types + boolean_types + array_types + 
                    tuple_types + special_types)
        
        for data_type in all_types:
            types[data_type.name] = data_type
            # 添加别名
            if data_type.aliases:
                for alias in data_type.aliases:
                    types[alias] = data_type
        
        return types
    
    def get_data_type_info(self, type_name: str) -> Optional[DataTypeInfo]:
        """获取数据类型信息"""
        return self.data_types.get(type_name)
    
    def list_types_by_category(self, category: DataTypeCategory) -> List[DataTypeInfo]:
        """按分类列出数据类型"""
        return [dt for dt in self.data_types.values() if dt.category == category]
    
    def get_type_recommendations(self, use_case: str) -> List[Dict[str, Any]]:
        """根据使用场景推荐数据类型"""
        recommendations = []
        
        use_case_lower = use_case.lower()
        
        if "id" in use_case_lower or "主键" in use_case_lower:
            recommendations.extend([
                {"type": "UInt64", "reason": "适合作为自增ID或大范围唯一标识"},
                {"type": "UUID", "reason": "适合作为全局唯一标识符"}
            ])
        
        if "时间" in use_case_lower or "date" in use_case_lower or "time" in use_case_lower:
            recommendations.extend([
                {"type": "DateTime", "reason": "适合存储精确到秒的时间戳"},
                {"type": "DateTime64", "reason": "适合需要亚秒精度的时间戳"},
                {"type": "Date", "reason": "适合只需要日期的场景"}
            ])
        
        if "金额" in use_case_lower or "价格" in use_case_lower or "money" in use_case_lower:
            recommendations.extend([
                {"type": "Decimal(P, S)", "reason": "适合精确的货币计算"},
                {"type": "Float64", "reason": "适合一般的数值计算"}
            ])
        
        if "计数" in use_case_lower or "count" in use_case_lower:
            recommendations.extend([
                {"type": "UInt32", "reason": "适合一般的计数场景"},
                {"type": "UInt64", "reason": "适合大数值的计数场景"}
            ])
        
        if "文本" in use_case_lower or "描述" in use_case_lower or "text" in use_case_lower:
            recommendations.extend([
                {"type": "String", "reason": "适合可变长度的文本内容"},
                {"type": "FixedString(N)", "reason": "适合固定长度的文本,如编码"}
            ])
        
        if "标签" in use_case_lower or "数组" in use_case_lower or "array" in use_case_lower:
            recommendations.extend([
                {"type": "Array(String)", "reason": "适合存储标签列表"},
                {"type": "Array(UInt32)", "reason": "适合存储ID列表"}
            ])
        
        return recommendations
    
    def calculate_storage_size(self, type_name: str, value_count: int, 
                              avg_string_length: int = 50) -> Dict[str, Any]:
        """计算存储大小"""
        data_type = self.get_data_type_info(type_name)
        if not data_type:
            return {"error": "未知的数据类型"}
        
        if data_type.size_bytes > 0:
            # 固定长度类型
            total_bytes = data_type.size_bytes * value_count
        else:
            # 可变长度类型
            if data_type.category == DataTypeCategory.STRING:
                # 字符串类型估算
                total_bytes = avg_string_length * value_count
            elif data_type.category == DataTypeCategory.ARRAY:
                # 数组类型估算(假设平均5个元素,每个元素4字节)
                total_bytes = 5 * 4 * value_count
            else:
                total_bytes = 8 * value_count  # 默认估算
        
        return {
            "type_name": type_name,
            "value_count": value_count,
            "total_bytes": total_bytes,
            "total_kb": round(total_bytes / 1024, 2),
            "total_mb": round(total_bytes / 1024 / 1024, 2),
            "bytes_per_value": round(total_bytes / value_count, 2) if value_count > 0 else 0
        }
    
    def get_type_conversion_matrix(self) -> Dict[str, List[str]]:
        """获取类型转换矩阵"""
        return {
            "UInt8": ["UInt16", "UInt32", "UInt64", "Int16", "Int32", "Int64", "Float32", "Float64"],
            "UInt16": ["UInt32", "UInt64", "Int32", "Int64", "Float32", "Float64"],
            "UInt32": ["UInt64", "Int64", "Float64"],
            "UInt64": ["Float64"],
            "Int8": ["Int16", "Int32", "Int64", "Float32", "Float64"],
            "Int16": ["Int32", "Int64", "Float32", "Float64"],
            "Int32": ["Int64", "Float64"],
            "Int64": ["Float64"],
            "Float32": ["Float64"],
            "String": ["FixedString(N)"],
            "Date": ["DateTime", "DateTime64"],
            "DateTime": ["DateTime64"]
        }

# 数据类型管理示例
print("\n=== ClickHouse数据类型管理 ===")

type_manager = ClickHouseDataTypeManager()

print("\n1. 按分类查看数据类型:")
for category in DataTypeCategory:
    types = type_manager.list_types_by_category(category)
    if types:
        print(f"\n   {category.value.upper()}类型:")
        for dt in types[:3]:  # 只显示前3个
            size_info = f"{dt.size_bytes}字节" if dt.size_bytes > 0 else "可变长度"
            print(f"     - {dt.name}: {dt.description} ({size_info})")

print("\n2. 数据类型详细信息:")
detail_types = ["UInt64", "String", "DateTime", "Array(T)"]
for type_name in detail_types:
    type_info = type_manager.get_data_type_info(type_name)
    if type_info:
        print(f"\n   {type_name}:")
        print(f"     描述: {type_info.description}")
        print(f"     大小: {type_info.size_bytes if type_info.size_bytes > 0 else '可变'}字节")
        if type_info.min_value is not None:
            print(f"     范围: {type_info.min_value} ~ {type_info.max_value}")
        if type_info.examples:
            print(f"     示例: {type_info.examples[:3]}")

print("\n3. 使用场景推荐:")
use_cases = ["用户ID", "创建时间", "商品价格", "访问计数", "用户标签"]
for use_case in use_cases:
    recommendations = type_manager.get_type_recommendations(use_case)
    print(f"\n   {use_case}:")
    for rec in recommendations[:2]:  # 只显示前2个推荐
        print(f"     - {rec['type']}: {rec['reason']}")

print("\n4. 存储大小计算:")
storage_scenarios = [
    ("UInt64", 1000000, 0),
    ("String", 1000000, 100),
    ("DateTime", 1000000, 0),
    ("Array(T)", 100000, 0)
]

for type_name, count, avg_len in storage_scenarios:
    result = type_manager.calculate_storage_size(type_name, count, avg_len)
    if "error" not in result:
        print(f"   {type_name} ({count:,}个值): {result['total_mb']}MB")

print("\n5. 类型转换建议:")
conversion_matrix = type_manager.get_type_conversion_matrix()
for source_type, target_types in list(conversion_matrix.items())[:3]:
    print(f"   {source_type} 可转换为: {', '.join(target_types[:3])}")

2.2.2 复合数据类型

from enum import Enum
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Union
import json

class ComplexTypeCategory(Enum):
    """复合类型分类枚举"""
    ARRAY = "array"
    TUPLE = "tuple"
    MAP = "map"
    NESTED = "nested"
    NULLABLE = "nullable"
    LOW_CARDINALITY = "low_cardinality"
    DECIMAL = "decimal"
    ENUM = "enum"

@dataclass
class ComplexTypeDefinition:
    """复合类型定义数据类"""
    name: str
    category: ComplexTypeCategory
    syntax: str
    description: str
    use_cases: List[str]
    examples: List[Dict[str, Any]]
    performance_notes: List[str] = field(default_factory=list)
    limitations: List[str] = field(default_factory=list)

class ClickHouseComplexTypeManager:
    """ClickHouse复合数据类型管理器"""
    
    def __init__(self):
        self.complex_types = self._initialize_complex_types()
    
    def _initialize_complex_types(self) -> Dict[str, ComplexTypeDefinition]:
        """初始化复合数据类型定义"""
        types = {}
        
        # 数组类型
        array_type = ComplexTypeDefinition(
            name="Array",
            category=ComplexTypeCategory.ARRAY,
            syntax="Array(T)",
            description="一维数组,存储相同类型的多个值",
            use_cases=[
                "存储标签列表",
                "存储多个ID",
                "存储时间序列数据点",
                "存储用户行为序列"
            ],
            examples=[
                {
                    "definition": "Array(String)",
                    "values": ["['tag1', 'tag2', 'tag3']", "['red', 'blue']", "[]"]
                },
                {
                    "definition": "Array(UInt32)",
                    "values": ["[1, 2, 3, 4, 5]", "[100, 200]", "[]"]
                },
                {
                    "definition": "Array(Float64)",
                    "values": ["[3.14, 2.71, 1.41]", "[0.5, 1.5, 2.5]"]
                }
            ],
            performance_notes=[
                "数组元素在内存中连续存储",
                "支持向量化操作",
                "查询时可以使用数组函数进行高效处理"
            ],
            limitations=[
                "不支持嵌套数组(Array(Array(T)))",
                "数组大小有限制",
                "所有元素必须是相同类型"
            ]
        )
        
        # 元组类型
        tuple_type = ComplexTypeDefinition(
            name="Tuple",
            category=ComplexTypeCategory.TUPLE,
            syntax="Tuple(T1, T2, ...)",
            description="存储固定数量的不同类型值",
            use_cases=[
                "存储坐标点(x, y)",
                "存储键值对",
                "存储复合标识符",
                "存储多维度指标"
            ],
            examples=[
                {
                    "definition": "Tuple(String, UInt32)",
                    "values": ["('user123', 25)", "('admin', 1)"]
                },
                {
                    "definition": "Tuple(Float64, Float64)",
                    "values": ["(39.9042, 116.4074)", "(40.7128, -74.0060)"]
                },
                {
                    "definition": "Tuple(String, UInt32, DateTime)",
                    "values": ["('order', 12345, '2024-01-01 12:00:00')"]
                }
            ],
            performance_notes=[
                "元组元素紧密打包存储",
                "访问特定元素效率高",
                "支持元组解构"
            ],
            limitations=[
                "元素数量和类型在创建时固定",
                "不支持动态添加或删除元素"
            ]
        )
        
        # Map类型
        map_type = ComplexTypeDefinition(
            name="Map",
            category=ComplexTypeCategory.MAP,
            syntax="Map(K, V)",
            description="键值对映射,类似于字典",
            use_cases=[
                "存储用户属性",
                "存储配置参数",
                "存储动态字段",
                "存储标签和值的映射"
            ],
            examples=[
                {
                    "definition": "Map(String, String)",
                    "values": ["{'name': 'John', 'city': 'Beijing'}", "{'color': 'red', 'size': 'large'}"]
                },
                {
                    "definition": "Map(String, UInt32)",
                    "values": ["{'age': 25, 'score': 95}", "{'count': 100, 'total': 1000}"]
                }
            ],
            performance_notes=[
                "键值对以数组形式存储",
                "支持高效的键查找",
                "压缩效果好"
            ],
            limitations=[
                "键的类型必须支持比较",
                "不保证键的顺序",
                "键必须唯一"
            ]
        )
        
        # Nested类型
        nested_type = ComplexTypeDefinition(
            name="Nested",
            category=ComplexTypeCategory.NESTED,
            syntax="Nested(col1 Type1, col2 Type2, ...)",
            description="嵌套表结构,存储结构化的子记录",
            use_cases=[
                "存储订单商品列表",
                "存储用户事件详情",
                "存储层级数据",
                "存储一对多关系数据"
            ],
            examples=[
                {
                    "definition": "Nested(product_id UInt32, quantity UInt16, price Float64)",
                    "values": ["[(1001, 2, 99.99), (1002, 1, 149.99)]"]
                },
                {
                    "definition": "Nested(event_type String, timestamp DateTime, value Float64)",
                    "values": ["[('click', '2024-01-01 12:00:00', 1.0), ('view', '2024-01-01 12:01:00', 0.5)]"]
                }
            ],
            performance_notes=[
                "列式存储嵌套数据",
                "支持高效的嵌套查询",
                "压缩效果优秀"
            ],
            limitations=[
                "嵌套层级有限制",
                "查询语法相对复杂",
                "不支持嵌套的嵌套"
            ]
        )
        
        # Nullable类型
        nullable_type = ComplexTypeDefinition(
            name="Nullable",
            category=ComplexTypeCategory.NULLABLE,
            syntax="Nullable(T)",
            description="允许NULL值的类型包装器",
            use_cases=[
                "可选字段",
                "数据清洗场景",
                "外部数据导入",
                "兼容性处理"
            ],
            examples=[
                {
                    "definition": "Nullable(String)",
                    "values": ["'hello'", "NULL", "'world'"]
                },
                {
                    "definition": "Nullable(UInt32)",
                    "values": ["123", "NULL", "456"]
                }
            ],
            performance_notes=[
                "额外存储NULL标记位",
                "查询时需要NULL检查",
                "影响压缩效果"
            ],
            limitations=[
                "性能开销较大",
                "不建议在高性能场景使用",
                "增加存储空间"
            ]
        )
        
        # LowCardinality类型
        low_cardinality_type = ComplexTypeDefinition(
            name="LowCardinality",
            category=ComplexTypeCategory.LOW_CARDINALITY,
            syntax="LowCardinality(T)",
            description="低基数类型,适合重复值较多的列",
            use_cases=[
                "状态字段",
                "分类字段",
                "枚举值",
                "国家/地区代码"
            ],
            examples=[
                {
                    "definition": "LowCardinality(String)",
                    "values": ["'active'", "'inactive'", "'pending'", "'active'"]
                },
                {
                    "definition": "LowCardinality(FixedString(2))",
                    "values": ["'CN'", "'US'", "'JP'", "'CN'"]
                }
            ],
            performance_notes=[
                "使用字典编码压缩",
                "查询性能优异",
                "内存使用效率高"
            ],
            limitations=[
                "只适合低基数数据(<10000个不同值)",
                "字典大小有限制",
                "不适合高基数数据"
            ]
        )
        
        # Decimal类型
        decimal_type = ComplexTypeDefinition(
            name="Decimal",
            category=ComplexTypeCategory.DECIMAL,
            syntax="Decimal(P, S) 或 Decimal32(S), Decimal64(S), Decimal128(S)",
            description="定点数类型,用于精确的数值计算",
            use_cases=[
                "货币金额",
                "财务计算",
                "精确的数值存储",
                "科学计算"
            ],
            examples=[
                {
                    "definition": "Decimal(10, 2)",
                    "values": ["123.45", "999.99", "0.01"]
                },
                {
                    "definition": "Decimal64(4)",
                    "values": ["1234.5678", "0.0001", "9999.9999"]
                }
            ],
            performance_notes=[
                "避免浮点数精度问题",
                "计算结果精确",
                "支持高精度运算"
            ],
            limitations=[
                "精度和范围有限制",
                "计算性能略低于浮点数",
                "存储空间较大"
            ]
        )
        
        # Enum类型
        enum_type = ComplexTypeDefinition(
            name="Enum",
            category=ComplexTypeCategory.ENUM,
            syntax="Enum8('name1' = value1, 'name2' = value2, ...) 或 Enum16(...)",
            description="枚举类型,预定义的命名常量集合",
            use_cases=[
                "状态字段",
                "类型分类",
                "选项列表",
                "固定值集合"
            ],
            examples=[
                {
                    "definition": "Enum8('pending' = 1, 'processing' = 2, 'completed' = 3, 'failed' = 4)",
                    "values": ["'pending'", "'completed'", "'failed'"]
                },
                {
                    "definition": "Enum16('small' = 100, 'medium' = 200, 'large' = 300, 'xlarge' = 400)",
                    "values": ["'small'", "'large'", "'medium'"]
                }
            ],
            performance_notes=[
                "内部存储为整数",
                "查询性能优秀",
                "内存使用效率高"
            ],
            limitations=[
                "值集合在创建时固定",
                "修改枚举值需要ALTER TABLE",
                "Enum8最多256个值,Enum16最多65536个值"
            ]
        )
        
        # 添加所有类型到字典
        all_types = [
            array_type, tuple_type, map_type, nested_type,
            nullable_type, low_cardinality_type, decimal_type, enum_type
        ]
        
        for complex_type in all_types:
            types[complex_type.name] = complex_type
        
        return types
    
    def get_complex_type_info(self, type_name: str) -> Optional[ComplexTypeDefinition]:
        """获取复合类型信息"""
        return self.complex_types.get(type_name)
    
    def list_types_by_category(self, category: ComplexTypeCategory) -> List[ComplexTypeDefinition]:
        """按分类列出复合类型"""
        return [ct for ct in self.complex_types.values() if ct.category == category]
    
    def get_type_recommendations_for_scenario(self, scenario: str) -> List[Dict[str, Any]]:
        """根据场景推荐复合类型"""
        recommendations = []
        scenario_lower = scenario.lower()
        
        if "标签" in scenario_lower or "tag" in scenario_lower:
            recommendations.append({
                "type": "Array(String)",
                "reason": "适合存储可变数量的标签",
                "example": "['tech', 'news', 'ai']"
            })
            recommendations.append({
                "type": "LowCardinality(String)",
                "reason": "如果标签种类有限,可以优化存储和查询性能",
                "example": "'technology'"
            })
        
        if "坐标" in scenario_lower or "位置" in scenario_lower or "coordinate" in scenario_lower:
            recommendations.append({
                "type": "Tuple(Float64, Float64)",
                "reason": "适合存储经纬度坐标",
                "example": "(39.9042, 116.4074)"
            })
        
        if "属性" in scenario_lower or "配置" in scenario_lower or "property" in scenario_lower:
            recommendations.append({
                "type": "Map(String, String)",
                "reason": "适合存储动态的键值对属性",
                "example": "{'color': 'red', 'size': 'large'}"
            })
        
        if "订单" in scenario_lower or "商品" in scenario_lower or "order" in scenario_lower:
            recommendations.append({
                "type": "Nested(product_id UInt32, quantity UInt16, price Decimal(10,2))",
                "reason": "适合存储订单中的商品列表",
                "example": "[(1001, 2, 99.99), (1002, 1, 149.99)]"
            })
        
        if "金额" in scenario_lower or "价格" in scenario_lower or "money" in scenario_lower:
            recommendations.append({
                "type": "Decimal(10, 2)",
                "reason": "适合精确的货币计算",
                "example": "123.45"
            })
        
        if "状态" in scenario_lower or "类型" in scenario_lower or "status" in scenario_lower:
            recommendations.append({
                "type": "Enum8('pending' = 1, 'active' = 2, 'inactive' = 3)",
                "reason": "适合预定义的状态值",
                "example": "'active'"
            })
            recommendations.append({
                "type": "LowCardinality(String)",
                "reason": "如果状态值较少且重复度高",
                "example": "'active'"
            })
        
        return recommendations
    
    def analyze_type_performance(self, type_name: str, data_characteristics: Dict[str, Any]) -> Dict[str, Any]:
        """分析类型性能特征"""
        complex_type = self.get_complex_type_info(type_name)
        if not complex_type:
            return {"error": "未知的复合类型"}
        
        analysis = {
            "type_name": type_name,
            "category": complex_type.category.value,
            "performance_score": 0,
            "storage_efficiency": "unknown",
            "query_performance": "unknown",
            "recommendations": []
        }
        
        # 根据数据特征分析性能
        cardinality = data_characteristics.get("cardinality", "unknown")
        data_size = data_characteristics.get("data_size", "unknown")
        query_pattern = data_characteristics.get("query_pattern", "unknown")
        
        if type_name == "LowCardinality":
            if isinstance(cardinality, int) and cardinality < 10000:
                analysis["performance_score"] = 9
                analysis["storage_efficiency"] = "excellent"
                analysis["query_performance"] = "excellent"
            else:
                analysis["performance_score"] = 3
                analysis["storage_efficiency"] = "poor"
                analysis["recommendations"].append("基数过高,建议使用普通String类型")
        
        elif type_name == "Array":
            if query_pattern == "element_access":
                analysis["performance_score"] = 8
                analysis["query_performance"] = "good"
            elif query_pattern == "aggregation":
                analysis["performance_score"] = 9
                analysis["query_performance"] = "excellent"
            else:
                analysis["performance_score"] = 7
        
        elif type_name == "Nested":
            if query_pattern == "nested_query":
                analysis["performance_score"] = 8
                analysis["query_performance"] = "good"
                analysis["storage_efficiency"] = "excellent"
            else:
                analysis["performance_score"] = 6
                analysis["recommendations"].append("考虑是否真的需要嵌套结构")
        
        elif type_name == "Nullable":
            analysis["performance_score"] = 5
            analysis["storage_efficiency"] = "fair"
            analysis["query_performance"] = "fair"
            analysis["recommendations"].append("尽量避免使用Nullable,考虑使用默认值")
        
        return analysis
    
    def generate_type_usage_examples(self, type_name: str) -> List[Dict[str, str]]:
        """生成类型使用示例"""
        complex_type = self.get_complex_type_info(type_name)
        if not complex_type:
            return []
        
        usage_examples = []
        
        for example in complex_type.examples:
            for value in example["values"]:
                usage_examples.append({
                    "create_table": f"CREATE TABLE example_table (\n    id UInt32,\n    data {example['definition']}\n) ENGINE = MergeTree() ORDER BY id;",
                    "insert_data": f"INSERT INTO example_table VALUES (1, {value});",
                    "query_data": f"SELECT * FROM example_table WHERE data = {value};"
                })
        
        return usage_examples[:3]  # 返回前3个示例

# 复合数据类型管理示例
print("\n=== ClickHouse复合数据类型管理 ===")

complex_type_manager = ClickHouseComplexTypeManager()

print("\n1. 复合类型概览:")
for category in ComplexTypeCategory:
    types = complex_type_manager.list_types_by_category(category)
    if types:
        print(f"\n   {category.value.upper()}类型:")
        for ct in types:
            print(f"     - {ct.name}: {ct.description}")

print("\n2. 详细类型信息:")
detail_types = ["Array", "Tuple", "Map", "LowCardinality"]
for type_name in detail_types:
    type_info = complex_type_manager.get_complex_type_info(type_name)
    if type_info:
        print(f"\n   {type_name}:")
        print(f"     语法: {type_info.syntax}")
        print(f"     描述: {type_info.description}")
        print(f"     适用场景: {', '.join(type_info.use_cases[:2])}")
        if type_info.examples:
            print(f"     示例: {type_info.examples[0]['definition']}")

print("\n3. 场景推荐:")
scenarios = ["用户标签", "地理坐标", "订单商品", "商品价格", "订单状态"]
for scenario in scenarios:
    recommendations = complex_type_manager.get_type_recommendations_for_scenario(scenario)
    print(f"\n   {scenario}:")
    for rec in recommendations[:2]:  # 只显示前2个推荐
        print(f"     - {rec['type']}: {rec['reason']}")

print("\n4. 性能分析:")
performance_tests = [
    ("LowCardinality", {"cardinality": 100, "data_size": "large"}),
    ("Array", {"query_pattern": "aggregation", "data_size": "medium"}),
    ("Nullable", {"null_ratio": 0.3, "data_size": "large"})
]

for type_name, characteristics in performance_tests:
    analysis = complex_type_manager.analyze_type_performance(type_name, characteristics)
    if "error" not in analysis:
        print(f"\n   {type_name}:")
        print(f"     性能评分: {analysis['performance_score']}/10")
        print(f"     存储效率: {analysis['storage_efficiency']}")
        print(f"     查询性能: {analysis['query_performance']}")
        if analysis['recommendations']:
            print(f"     建议: {analysis['recommendations'][0]}")

print("\n5. 使用示例:")
for type_name in ["Array", "Tuple"]:
    examples = complex_type_manager.generate_type_usage_examples(type_name)
    if examples:
        print(f"\n   {type_name} 使用示例:")
        example = examples[0]
        print(f"     创建表: {example['create_table'].split('ENGINE')[0].strip()}...")
         print(f"     插入数据: {example['insert_data']}")

总结

本章详细介绍了 ClickHouse 的基础概念,包括:

关键要点

  1. 数据库和表管理

    • 数据库是表的逻辑容器
    • 支持多种表引擎
    • 表的生命周期管理
    • 分区管理机制
  2. 数据类型系统

    • 丰富的基础数据类型
    • 强大的复合数据类型
    • 类型选择对性能的影响
    • 场景化的类型推荐
  3. 复合数据类型特点

    • Array:适合存储列表数据
    • Tuple:适合存储结构化数据
    • Map:适合存储键值对
    • Nested:适合存储嵌套结构
    • LowCardinality:优化低基数数据
    • Decimal:精确数值计算
    • Enum:预定义枚举值

最佳实践

  1. 类型选择原则

    • 根据数据特征选择合适的类型
    • 考虑查询模式和性能需求
    • 避免过度使用 Nullable 类型
    • 合理使用 LowCardinality 优化
  2. 表设计建议

    • 合理设计分区策略
    • 选择合适的排序键
    • 考虑数据压缩需求
    • 规划表的生命周期
  3. 性能优化

    • 了解各类型的性能特征
    • 根据查询模式优化数据结构
    • 监控和分析类型使用效果

下一步学习

  • SQL基础操作:学习 ClickHouse 的 SQL 语法
  • 表引擎详解:深入了解各种表引擎特性
  • 查询优化:掌握查询性能优化技巧
  • 实际应用:在项目中应用所学概念 “`