2.1 数据库和表的概念
2.1.1 数据库管理
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Optional, Any
import time
import random
class DatabaseEngine(Enum):
"""数据库引擎枚举"""
ORDINARY = "Ordinary" # 普通数据库
MEMORY = "Memory" # 内存数据库
LAZY = "Lazy" # 延迟加载数据库
MYSQL = "MySQL" # MySQL数据库引擎
POSTGRESQL = "PostgreSQL" # PostgreSQL数据库引擎
class TableStatus(Enum):
"""表状态枚举"""
ACTIVE = "active"
READONLY = "readonly"
DROPPED = "dropped"
DETACHED = "detached"
@dataclass
class DatabaseInfo:
"""数据库信息数据类"""
name: str
engine: DatabaseEngine
data_path: str
metadata_path: str
tables_count: int = 0
total_size_bytes: int = 0
created_time: float = 0.0
@dataclass
class TableInfo:
"""表信息数据类"""
database: str
name: str
engine: str
status: TableStatus
rows_count: int = 0
size_bytes: int = 0
columns_count: int = 0
partitions_count: int = 0
created_time: float = 0.0
last_modified: float = 0.0
class ClickHouseDatabaseManager:
"""ClickHouse数据库管理器"""
def __init__(self):
self.databases: Dict[str, DatabaseInfo] = {}
self.tables: Dict[str, List[TableInfo]] = {}
self._initialize_system_databases()
def _initialize_system_databases(self):
"""初始化系统数据库"""
system_databases = [
DatabaseInfo(
name="system",
engine=DatabaseEngine.ORDINARY,
data_path="/var/lib/clickhouse/data/system/",
metadata_path="/var/lib/clickhouse/metadata/system/",
tables_count=50,
total_size_bytes=1024*1024*10, # 10MB
created_time=time.time() - 86400
),
DatabaseInfo(
name="default",
engine=DatabaseEngine.ORDINARY,
data_path="/var/lib/clickhouse/data/default/",
metadata_path="/var/lib/clickhouse/metadata/default/",
tables_count=0,
total_size_bytes=0,
created_time=time.time() - 86400
),
DatabaseInfo(
name="information_schema",
engine=DatabaseEngine.MEMORY,
data_path="",
metadata_path="",
tables_count=10,
total_size_bytes=1024*1024, # 1MB
created_time=time.time() - 86400
)
]
for db in system_databases:
self.databases[db.name] = db
self.tables[db.name] = []
def create_database(self, name: str, engine: DatabaseEngine = DatabaseEngine.ORDINARY) -> bool:
"""创建数据库"""
if name in self.databases:
return False
db_info = DatabaseInfo(
name=name,
engine=engine,
data_path=f"/var/lib/clickhouse/data/{name}/",
metadata_path=f"/var/lib/clickhouse/metadata/{name}/",
created_time=time.time()
)
self.databases[name] = db_info
self.tables[name] = []
return True
def drop_database(self, name: str) -> bool:
"""删除数据库"""
if name not in self.databases or name in ["system", "information_schema"]:
return False
del self.databases[name]
del self.tables[name]
return True
def list_databases(self) -> List[DatabaseInfo]:
"""列出所有数据库"""
return list(self.databases.values())
def get_database_info(self, name: str) -> Optional[DatabaseInfo]:
"""获取数据库信息"""
return self.databases.get(name)
def create_table(self, database: str, table_name: str, engine: str) -> bool:
"""创建表"""
if database not in self.databases:
return False
table_info = TableInfo(
database=database,
name=table_name,
engine=engine,
status=TableStatus.ACTIVE,
created_time=time.time(),
last_modified=time.time()
)
self.tables[database].append(table_info)
self.databases[database].tables_count += 1
return True
def drop_table(self, database: str, table_name: str) -> bool:
"""删除表"""
if database not in self.tables:
return False
for i, table in enumerate(self.tables[database]):
if table.name == table_name:
self.tables[database][i].status = TableStatus.DROPPED
self.databases[database].tables_count -= 1
return True
return False
def list_tables(self, database: str) -> List[TableInfo]:
"""列出数据库中的所有表"""
if database not in self.tables:
return []
return [t for t in self.tables[database] if t.status == TableStatus.ACTIVE]
def get_table_info(self, database: str, table_name: str) -> Optional[TableInfo]:
"""获取表信息"""
if database not in self.tables:
return None
for table in self.tables[database]:
if table.name == table_name and table.status == TableStatus.ACTIVE:
return table
return None
def get_database_statistics(self) -> Dict[str, Any]:
"""获取数据库统计信息"""
total_databases = len(self.databases)
total_tables = sum(len(self.list_tables(db)) for db in self.databases.keys())
total_size = sum(db.total_size_bytes for db in self.databases.values())
return {
"total_databases": total_databases,
"total_tables": total_tables,
"total_size_mb": round(total_size / 1024 / 1024, 2),
"databases_by_engine": self._get_databases_by_engine(),
"largest_databases": self._get_largest_databases()
}
def _get_databases_by_engine(self) -> Dict[str, int]:
"""按引擎统计数据库数量"""
engine_count = {}
for db in self.databases.values():
engine = db.engine.value
engine_count[engine] = engine_count.get(engine, 0) + 1
return engine_count
def _get_largest_databases(self, limit: int = 5) -> List[Dict[str, Any]]:
"""获取最大的数据库"""
sorted_dbs = sorted(self.databases.values(), key=lambda x: x.total_size_bytes, reverse=True)
return [
{
"name": db.name,
"size_mb": round(db.total_size_bytes / 1024 / 1024, 2),
"tables_count": db.tables_count
}
for db in sorted_dbs[:limit]
]
# 数据库管理示例
print("=== ClickHouse数据库管理 ===")
db_manager = ClickHouseDatabaseManager()
print("\n1. 系统数据库列表:")
for db in db_manager.list_databases():
print(f" - {db.name} ({db.engine.value}): {db.tables_count}个表")
print("\n2. 创建业务数据库:")
business_dbs = ["analytics", "logs", "metrics"]
for db_name in business_dbs:
success = db_manager.create_database(db_name)
print(f" 创建数据库 {db_name}: {'成功' if success else '失败'}")
print("\n3. 创建示例表:")
tables_to_create = [
("analytics", "user_events", "MergeTree"),
("analytics", "page_views", "MergeTree"),
("logs", "access_logs", "MergeTree"),
("logs", "error_logs", "MergeTree"),
("metrics", "system_metrics", "SummingMergeTree")
]
for db, table, engine in tables_to_create:
success = db_manager.create_table(db, table, engine)
print(f" 创建表 {db}.{table} ({engine}): {'成功' if success else '失败'}")
print("\n4. 数据库统计信息:")
stats = db_manager.get_database_statistics()
print(f" 总数据库数: {stats['total_databases']}")
print(f" 总表数: {stats['total_tables']}")
print(f" 总大小: {stats['total_size_mb']} MB")
print("\n 按引擎分布:")
for engine, count in stats['databases_by_engine'].items():
print(f" {engine}: {count}个数据库")
print("\n5. 查看特定数据库的表:")
for db_name in ["analytics", "logs"]:
tables = db_manager.list_tables(db_name)
print(f"\n 数据库 {db_name} 的表:")
for table in tables:
print(f" - {table.name} ({table.engine})")
2.1.2 表的生命周期管理
from enum import Enum
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any
import time
import json
class TableOperation(Enum):
"""表操作枚举"""
CREATE = "create"
ALTER = "alter"
DROP = "drop"
DETACH = "detach"
ATTACH = "attach"
RENAME = "rename"
TRUNCATE = "truncate"
class PartitionStatus(Enum):
"""分区状态枚举"""
ACTIVE = "active"
INACTIVE = "inactive"
READONLY = "readonly"
OUTDATED = "outdated"
@dataclass
class TableOperation:
"""表操作记录数据类"""
operation_type: TableOperation
database: str
table_name: str
timestamp: float
user: str
details: Dict[str, Any] = field(default_factory=dict)
success: bool = True
error_message: Optional[str] = None
@dataclass
class PartitionInfo:
"""分区信息数据类"""
partition_id: str
table: str
database: str
status: PartitionStatus
rows_count: int = 0
size_bytes: int = 0
min_date: Optional[str] = None
max_date: Optional[str] = None
created_time: float = 0.0
last_modified: float = 0.0
class ClickHouseTableLifecycleManager:
"""ClickHouse表生命周期管理器"""
def __init__(self):
self.operations_log: List[TableOperation] = []
self.partitions: Dict[str, List[PartitionInfo]] = {} # table_key -> partitions
self.table_metadata: Dict[str, Dict[str, Any]] = {} # table_key -> metadata
def _get_table_key(self, database: str, table_name: str) -> str:
"""获取表的唯一键"""
return f"{database}.{table_name}"
def create_table_with_lifecycle(self, database: str, table_name: str,
engine: str, columns: List[Dict[str, str]],
partition_by: Optional[str] = None,
order_by: Optional[str] = None,
ttl: Optional[str] = None) -> bool:
"""创建表并设置生命周期"""
table_key = self._get_table_key(database, table_name)
# 记录操作
operation = TableOperation(
operation_type=TableOperation.CREATE,
database=database,
table_name=table_name,
timestamp=time.time(),
user="admin",
details={
"engine": engine,
"columns": columns,
"partition_by": partition_by,
"order_by": order_by,
"ttl": ttl
}
)
try:
# 存储表元数据
self.table_metadata[table_key] = {
"database": database,
"table_name": table_name,
"engine": engine,
"columns": columns,
"partition_by": partition_by,
"order_by": order_by,
"ttl": ttl,
"created_time": time.time(),
"status": "active"
}
# 初始化分区列表
self.partitions[table_key] = []
operation.success = True
self.operations_log.append(operation)
return True
except Exception as e:
operation.success = False
operation.error_message = str(e)
self.operations_log.append(operation)
return False
def alter_table(self, database: str, table_name: str,
alter_type: str, details: Dict[str, Any]) -> bool:
"""修改表结构"""
table_key = self._get_table_key(database, table_name)
if table_key not in self.table_metadata:
return False
operation = TableOperation(
operation_type=TableOperation.ALTER,
database=database,
table_name=table_name,
timestamp=time.time(),
user="admin",
details={"alter_type": alter_type, **details}
)
try:
# 更新元数据
if alter_type == "ADD_COLUMN":
self.table_metadata[table_key]["columns"].append(details["column"])
elif alter_type == "DROP_COLUMN":
columns = self.table_metadata[table_key]["columns"]
self.table_metadata[table_key]["columns"] = [
col for col in columns if col["name"] != details["column_name"]
]
elif alter_type == "MODIFY_TTL":
self.table_metadata[table_key]["ttl"] = details["ttl"]
operation.success = True
self.operations_log.append(operation)
return True
except Exception as e:
operation.success = False
operation.error_message = str(e)
self.operations_log.append(operation)
return False
def drop_table(self, database: str, table_name: str) -> bool:
"""删除表"""
table_key = self._get_table_key(database, table_name)
operation = TableOperation(
operation_type=TableOperation.DROP,
database=database,
table_name=table_name,
timestamp=time.time(),
user="admin"
)
try:
if table_key in self.table_metadata:
self.table_metadata[table_key]["status"] = "dropped"
del self.partitions[table_key]
operation.success = True
self.operations_log.append(operation)
return True
except Exception as e:
operation.success = False
operation.error_message = str(e)
self.operations_log.append(operation)
return False
def detach_table(self, database: str, table_name: str) -> bool:
"""分离表"""
table_key = self._get_table_key(database, table_name)
if table_key not in self.table_metadata:
return False
operation = TableOperation(
operation_type=TableOperation.DETACH,
database=database,
table_name=table_name,
timestamp=time.time(),
user="admin"
)
self.table_metadata[table_key]["status"] = "detached"
operation.success = True
self.operations_log.append(operation)
return True
def attach_table(self, database: str, table_name: str) -> bool:
"""附加表"""
table_key = self._get_table_key(database, table_name)
if table_key not in self.table_metadata:
return False
operation = TableOperation(
operation_type=TableOperation.ATTACH,
database=database,
table_name=table_name,
timestamp=time.time(),
user="admin"
)
self.table_metadata[table_key]["status"] = "active"
operation.success = True
self.operations_log.append(operation)
return True
def add_partition(self, database: str, table_name: str,
partition_id: str, rows_count: int = 0,
size_bytes: int = 0) -> bool:
"""添加分区"""
table_key = self._get_table_key(database, table_name)
if table_key not in self.partitions:
return False
partition = PartitionInfo(
partition_id=partition_id,
table=table_name,
database=database,
status=PartitionStatus.ACTIVE,
rows_count=rows_count,
size_bytes=size_bytes,
created_time=time.time(),
last_modified=time.time()
)
self.partitions[table_key].append(partition)
return True
def drop_partition(self, database: str, table_name: str, partition_id: str) -> bool:
"""删除分区"""
table_key = self._get_table_key(database, table_name)
if table_key not in self.partitions:
return False
for i, partition in enumerate(self.partitions[table_key]):
if partition.partition_id == partition_id:
self.partitions[table_key][i].status = PartitionStatus.INACTIVE
return True
return False
def get_table_partitions(self, database: str, table_name: str) -> List[PartitionInfo]:
"""获取表的分区信息"""
table_key = self._get_table_key(database, table_name)
return self.partitions.get(table_key, [])
def get_operations_history(self, database: str = None,
table_name: str = None,
limit: int = 100) -> List[TableOperation]:
"""获取操作历史"""
operations = self.operations_log
if database:
operations = [op for op in operations if op.database == database]
if table_name:
operations = [op for op in operations if op.table_name == table_name]
return sorted(operations, key=lambda x: x.timestamp, reverse=True)[:limit]
def get_lifecycle_statistics(self) -> Dict[str, Any]:
"""获取生命周期统计信息"""
total_operations = len(self.operations_log)
successful_operations = len([op for op in self.operations_log if op.success])
operations_by_type = {}
for op in self.operations_log:
op_type = op.operation_type.value
operations_by_type[op_type] = operations_by_type.get(op_type, 0) + 1
active_tables = len([t for t in self.table_metadata.values() if t["status"] == "active"])
total_partitions = sum(len(partitions) for partitions in self.partitions.values())
return {
"total_operations": total_operations,
"successful_operations": successful_operations,
"success_rate": round(successful_operations / total_operations * 100, 2) if total_operations > 0 else 0,
"operations_by_type": operations_by_type,
"active_tables": active_tables,
"total_partitions": total_partitions
}
# 表生命周期管理示例
print("\n=== ClickHouse表生命周期管理 ===")
lifecycle_manager = ClickHouseTableLifecycleManager()
print("\n1. 创建表:")
tables_config = [
{
"database": "analytics",
"table_name": "user_events",
"engine": "MergeTree",
"columns": [
{"name": "user_id", "type": "UInt64"},
{"name": "event_type", "type": "String"},
{"name": "timestamp", "type": "DateTime"},
{"name": "properties", "type": "String"}
],
"partition_by": "toYYYYMM(timestamp)",
"order_by": "(user_id, timestamp)",
"ttl": "timestamp + INTERVAL 1 YEAR"
},
{
"database": "logs",
"table_name": "access_logs",
"engine": "MergeTree",
"columns": [
{"name": "ip", "type": "String"},
{"name": "method", "type": "String"},
{"name": "url", "type": "String"},
{"name": "status_code", "type": "UInt16"},
{"name": "timestamp", "type": "DateTime"}
],
"partition_by": "toYYYYMMDD(timestamp)",
"order_by": "timestamp",
"ttl": "timestamp + INTERVAL 3 MONTH"
}
]
for config in tables_config:
success = lifecycle_manager.create_table_with_lifecycle(**config)
print(f" 创建表 {config['database']}.{config['table_name']}: {'成功' if success else '失败'}")
print("\n2. 添加分区:")
partitions_to_add = [
("analytics", "user_events", "202401", 1000000, 50*1024*1024),
("analytics", "user_events", "202402", 1200000, 60*1024*1024),
("logs", "access_logs", "20240101", 500000, 25*1024*1024),
("logs", "access_logs", "20240102", 600000, 30*1024*1024)
]
for db, table, partition_id, rows, size in partitions_to_add:
success = lifecycle_manager.add_partition(db, table, partition_id, rows, size)
print(f" 添加分区 {db}.{table}.{partition_id}: {'成功' if success else '失败'}")
print("\n3. 表结构修改:")
alter_operations = [
("analytics", "user_events", "ADD_COLUMN", {"column": {"name": "session_id", "type": "String"}}),
("logs", "access_logs", "MODIFY_TTL", {"ttl": "timestamp + INTERVAL 6 MONTH"})
]
for db, table, alter_type, details in alter_operations:
success = lifecycle_manager.alter_table(db, table, alter_type, details)
print(f" 修改表 {db}.{table} ({alter_type}): {'成功' if success else '失败'}")
print("\n4. 表操作:")
# 分离和重新附加表
success = lifecycle_manager.detach_table("logs", "access_logs")
print(f" 分离表 logs.access_logs: {'成功' if success else '失败'}")
success = lifecycle_manager.attach_table("logs", "access_logs")
print(f" 附加表 logs.access_logs: {'成功' if success else '失败'}")
print("\n5. 查看分区信息:")
for db, table in [("analytics", "user_events"), ("logs", "access_logs")]:
partitions = lifecycle_manager.get_table_partitions(db, table)
print(f"\n 表 {db}.{table} 的分区:")
for partition in partitions:
print(f" - {partition.partition_id}: {partition.rows_count:,}行, {partition.size_bytes//1024//1024}MB")
print("\n6. 操作历史:")
history = lifecycle_manager.get_operations_history(limit=10)
print(f" 最近{len(history)}个操作:")
for op in history:
status = "成功" if op.success else "失败"
print(f" - {op.operation_type.value} {op.database}.{op.table_name}: {status}")
print("\n7. 生命周期统计:")
stats = lifecycle_manager.get_lifecycle_statistics()
print(f" 总操作数: {stats['total_operations']}")
print(f" 成功率: {stats['success_rate']}%")
print(f" 活跃表数: {stats['active_tables']}")
print(f" 总分区数: {stats['total_partitions']}")
print("\n 按操作类型分布:")
for op_type, count in stats['operations_by_type'].items():
print(f" {op_type}: {count}次")
2.2 数据类型
2.2.1 基础数据类型
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Union
import struct
import json
import datetime
class DataTypeCategory(Enum):
"""数据类型分类枚举"""
INTEGER = "integer"
FLOAT = "float"
STRING = "string"
DATE_TIME = "datetime"
BOOLEAN = "boolean"
ARRAY = "array"
TUPLE = "tuple"
NESTED = "nested"
SPECIAL = "special"
@dataclass
class DataTypeInfo:
"""数据类型信息数据类"""
name: str
category: DataTypeCategory
size_bytes: int # -1表示可变长度
min_value: Optional[Union[int, float, str]] = None
max_value: Optional[Union[int, float, str]] = None
description: str = ""
examples: List[Any] = None
aliases: List[str] = None
class ClickHouseDataTypeManager:
"""ClickHouse数据类型管理器"""
def __init__(self):
self.data_types = self._initialize_data_types()
def _initialize_data_types(self) -> Dict[str, DataTypeInfo]:
"""初始化数据类型定义"""
types = {}
# 整数类型
integer_types = [
DataTypeInfo(
name="UInt8",
category=DataTypeCategory.INTEGER,
size_bytes=1,
min_value=0,
max_value=255,
description="8位无符号整数",
examples=[0, 100, 255],
aliases=["TINYINT UNSIGNED"]
),
DataTypeInfo(
name="UInt16",
category=DataTypeCategory.INTEGER,
size_bytes=2,
min_value=0,
max_value=65535,
description="16位无符号整数",
examples=[0, 1000, 65535]
),
DataTypeInfo(
name="UInt32",
category=DataTypeCategory.INTEGER,
size_bytes=4,
min_value=0,
max_value=4294967295,
description="32位无符号整数",
examples=[0, 1000000, 4294967295]
),
DataTypeInfo(
name="UInt64",
category=DataTypeCategory.INTEGER,
size_bytes=8,
min_value=0,
max_value=18446744073709551615,
description="64位无符号整数",
examples=[0, 1000000000, 18446744073709551615]
),
DataTypeInfo(
name="Int8",
category=DataTypeCategory.INTEGER,
size_bytes=1,
min_value=-128,
max_value=127,
description="8位有符号整数",
examples=[-128, 0, 127],
aliases=["TINYINT"]
),
DataTypeInfo(
name="Int16",
category=DataTypeCategory.INTEGER,
size_bytes=2,
min_value=-32768,
max_value=32767,
description="16位有符号整数",
examples=[-32768, 0, 32767],
aliases=["SMALLINT"]
),
DataTypeInfo(
name="Int32",
category=DataTypeCategory.INTEGER,
size_bytes=4,
min_value=-2147483648,
max_value=2147483647,
description="32位有符号整数",
examples=[-2147483648, 0, 2147483647],
aliases=["INT"]
),
DataTypeInfo(
name="Int64",
category=DataTypeCategory.INTEGER,
size_bytes=8,
min_value=-9223372036854775808,
max_value=9223372036854775807,
description="64位有符号整数",
examples=[-9223372036854775808, 0, 9223372036854775807],
aliases=["BIGINT"]
)
]
# 浮点类型
float_types = [
DataTypeInfo(
name="Float32",
category=DataTypeCategory.FLOAT,
size_bytes=4,
description="32位浮点数",
examples=[3.14, -2.5, 1.23e-4],
aliases=["FLOAT"]
),
DataTypeInfo(
name="Float64",
category=DataTypeCategory.FLOAT,
size_bytes=8,
description="64位浮点数",
examples=[3.141592653589793, -2.5, 1.23e-10],
aliases=["DOUBLE"]
)
]
# 字符串类型
string_types = [
DataTypeInfo(
name="String",
category=DataTypeCategory.STRING,
size_bytes=-1,
description="可变长度字符串",
examples=["hello", "world", "ClickHouse"],
aliases=["TEXT", "VARCHAR"]
),
DataTypeInfo(
name="FixedString(N)",
category=DataTypeCategory.STRING,
size_bytes=-1, # 取决于N
description="固定长度字符串",
examples=["hello", "world"],
aliases=["CHAR(N)"]
)
]
# 日期时间类型
datetime_types = [
DataTypeInfo(
name="Date",
category=DataTypeCategory.DATE_TIME,
size_bytes=2,
min_value="1900-01-01",
max_value="2299-12-31",
description="日期类型,精度到天",
examples=["2024-01-01", "2023-12-31"]
),
DataTypeInfo(
name="Date32",
category=DataTypeCategory.DATE_TIME,
size_bytes=4,
min_value="1900-01-01",
max_value="2299-12-31",
description="扩展日期类型",
examples=["2024-01-01", "1950-06-15"]
),
DataTypeInfo(
name="DateTime",
category=DataTypeCategory.DATE_TIME,
size_bytes=4,
min_value="1900-01-01 00:00:00",
max_value="2106-02-07 06:28:15",
description="日期时间类型,精度到秒",
examples=["2024-01-01 12:00:00", "2023-12-31 23:59:59"]
),
DataTypeInfo(
name="DateTime64",
category=DataTypeCategory.DATE_TIME,
size_bytes=8,
description="高精度日期时间类型,支持亚秒精度",
examples=["2024-01-01 12:00:00.123", "2023-12-31 23:59:59.999999"]
)
]
# 布尔类型
boolean_types = [
DataTypeInfo(
name="Bool",
category=DataTypeCategory.BOOLEAN,
size_bytes=1,
description="布尔类型",
examples=[True, False, 1, 0]
)
]
# 数组类型
array_types = [
DataTypeInfo(
name="Array(T)",
category=DataTypeCategory.ARRAY,
size_bytes=-1,
description="数组类型,元素类型为T",
examples=[[1, 2, 3], ["a", "b", "c"], []]
)
]
# 元组类型
tuple_types = [
DataTypeInfo(
name="Tuple(T1, T2, ...)",
category=DataTypeCategory.TUPLE,
size_bytes=-1,
description="元组类型,包含多个不同类型的元素",
examples=[(1, "hello"), (3.14, "world", True)]
)
]
# 特殊类型
special_types = [
DataTypeInfo(
name="UUID",
category=DataTypeCategory.SPECIAL,
size_bytes=16,
description="UUID类型",
examples=["550e8400-e29b-41d4-a716-446655440000"]
),
DataTypeInfo(
name="IPv4",
category=DataTypeCategory.SPECIAL,
size_bytes=4,
description="IPv4地址类型",
examples=["192.168.1.1", "10.0.0.1"]
),
DataTypeInfo(
name="IPv6",
category=DataTypeCategory.SPECIAL,
size_bytes=16,
description="IPv6地址类型",
examples=["2001:0db8:85a3:0000:0000:8a2e:0370:7334"]
),
DataTypeInfo(
name="Enum8",
category=DataTypeCategory.SPECIAL,
size_bytes=1,
description="8位枚举类型",
examples=["'red' = 1, 'green' = 2, 'blue' = 3"]
),
DataTypeInfo(
name="Enum16",
category=DataTypeCategory.SPECIAL,
size_bytes=2,
description="16位枚举类型",
examples=["'small' = 1, 'medium' = 2, 'large' = 3"]
)
]
# 合并所有类型
all_types = (integer_types + float_types + string_types +
datetime_types + boolean_types + array_types +
tuple_types + special_types)
for data_type in all_types:
types[data_type.name] = data_type
# 添加别名
if data_type.aliases:
for alias in data_type.aliases:
types[alias] = data_type
return types
def get_data_type_info(self, type_name: str) -> Optional[DataTypeInfo]:
"""获取数据类型信息"""
return self.data_types.get(type_name)
def list_types_by_category(self, category: DataTypeCategory) -> List[DataTypeInfo]:
"""按分类列出数据类型"""
return [dt for dt in self.data_types.values() if dt.category == category]
def get_type_recommendations(self, use_case: str) -> List[Dict[str, Any]]:
"""根据使用场景推荐数据类型"""
recommendations = []
use_case_lower = use_case.lower()
if "id" in use_case_lower or "主键" in use_case_lower:
recommendations.extend([
{"type": "UInt64", "reason": "适合作为自增ID或大范围唯一标识"},
{"type": "UUID", "reason": "适合作为全局唯一标识符"}
])
if "时间" in use_case_lower or "date" in use_case_lower or "time" in use_case_lower:
recommendations.extend([
{"type": "DateTime", "reason": "适合存储精确到秒的时间戳"},
{"type": "DateTime64", "reason": "适合需要亚秒精度的时间戳"},
{"type": "Date", "reason": "适合只需要日期的场景"}
])
if "金额" in use_case_lower or "价格" in use_case_lower or "money" in use_case_lower:
recommendations.extend([
{"type": "Decimal(P, S)", "reason": "适合精确的货币计算"},
{"type": "Float64", "reason": "适合一般的数值计算"}
])
if "计数" in use_case_lower or "count" in use_case_lower:
recommendations.extend([
{"type": "UInt32", "reason": "适合一般的计数场景"},
{"type": "UInt64", "reason": "适合大数值的计数场景"}
])
if "文本" in use_case_lower or "描述" in use_case_lower or "text" in use_case_lower:
recommendations.extend([
{"type": "String", "reason": "适合可变长度的文本内容"},
{"type": "FixedString(N)", "reason": "适合固定长度的文本,如编码"}
])
if "标签" in use_case_lower or "数组" in use_case_lower or "array" in use_case_lower:
recommendations.extend([
{"type": "Array(String)", "reason": "适合存储标签列表"},
{"type": "Array(UInt32)", "reason": "适合存储ID列表"}
])
return recommendations
def calculate_storage_size(self, type_name: str, value_count: int,
avg_string_length: int = 50) -> Dict[str, Any]:
"""计算存储大小"""
data_type = self.get_data_type_info(type_name)
if not data_type:
return {"error": "未知的数据类型"}
if data_type.size_bytes > 0:
# 固定长度类型
total_bytes = data_type.size_bytes * value_count
else:
# 可变长度类型
if data_type.category == DataTypeCategory.STRING:
# 字符串类型估算
total_bytes = avg_string_length * value_count
elif data_type.category == DataTypeCategory.ARRAY:
# 数组类型估算(假设平均5个元素,每个元素4字节)
total_bytes = 5 * 4 * value_count
else:
total_bytes = 8 * value_count # 默认估算
return {
"type_name": type_name,
"value_count": value_count,
"total_bytes": total_bytes,
"total_kb": round(total_bytes / 1024, 2),
"total_mb": round(total_bytes / 1024 / 1024, 2),
"bytes_per_value": round(total_bytes / value_count, 2) if value_count > 0 else 0
}
def get_type_conversion_matrix(self) -> Dict[str, List[str]]:
"""获取类型转换矩阵"""
return {
"UInt8": ["UInt16", "UInt32", "UInt64", "Int16", "Int32", "Int64", "Float32", "Float64"],
"UInt16": ["UInt32", "UInt64", "Int32", "Int64", "Float32", "Float64"],
"UInt32": ["UInt64", "Int64", "Float64"],
"UInt64": ["Float64"],
"Int8": ["Int16", "Int32", "Int64", "Float32", "Float64"],
"Int16": ["Int32", "Int64", "Float32", "Float64"],
"Int32": ["Int64", "Float64"],
"Int64": ["Float64"],
"Float32": ["Float64"],
"String": ["FixedString(N)"],
"Date": ["DateTime", "DateTime64"],
"DateTime": ["DateTime64"]
}
# 数据类型管理示例
print("\n=== ClickHouse数据类型管理 ===")
type_manager = ClickHouseDataTypeManager()
print("\n1. 按分类查看数据类型:")
for category in DataTypeCategory:
types = type_manager.list_types_by_category(category)
if types:
print(f"\n {category.value.upper()}类型:")
for dt in types[:3]: # 只显示前3个
size_info = f"{dt.size_bytes}字节" if dt.size_bytes > 0 else "可变长度"
print(f" - {dt.name}: {dt.description} ({size_info})")
print("\n2. 数据类型详细信息:")
detail_types = ["UInt64", "String", "DateTime", "Array(T)"]
for type_name in detail_types:
type_info = type_manager.get_data_type_info(type_name)
if type_info:
print(f"\n {type_name}:")
print(f" 描述: {type_info.description}")
print(f" 大小: {type_info.size_bytes if type_info.size_bytes > 0 else '可变'}字节")
if type_info.min_value is not None:
print(f" 范围: {type_info.min_value} ~ {type_info.max_value}")
if type_info.examples:
print(f" 示例: {type_info.examples[:3]}")
print("\n3. 使用场景推荐:")
use_cases = ["用户ID", "创建时间", "商品价格", "访问计数", "用户标签"]
for use_case in use_cases:
recommendations = type_manager.get_type_recommendations(use_case)
print(f"\n {use_case}:")
for rec in recommendations[:2]: # 只显示前2个推荐
print(f" - {rec['type']}: {rec['reason']}")
print("\n4. 存储大小计算:")
storage_scenarios = [
("UInt64", 1000000, 0),
("String", 1000000, 100),
("DateTime", 1000000, 0),
("Array(T)", 100000, 0)
]
for type_name, count, avg_len in storage_scenarios:
result = type_manager.calculate_storage_size(type_name, count, avg_len)
if "error" not in result:
print(f" {type_name} ({count:,}个值): {result['total_mb']}MB")
print("\n5. 类型转换建议:")
conversion_matrix = type_manager.get_type_conversion_matrix()
for source_type, target_types in list(conversion_matrix.items())[:3]:
print(f" {source_type} 可转换为: {', '.join(target_types[:3])}")
2.2.2 复合数据类型
from enum import Enum
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Union
import json
class ComplexTypeCategory(Enum):
"""复合类型分类枚举"""
ARRAY = "array"
TUPLE = "tuple"
MAP = "map"
NESTED = "nested"
NULLABLE = "nullable"
LOW_CARDINALITY = "low_cardinality"
DECIMAL = "decimal"
ENUM = "enum"
@dataclass
class ComplexTypeDefinition:
"""复合类型定义数据类"""
name: str
category: ComplexTypeCategory
syntax: str
description: str
use_cases: List[str]
examples: List[Dict[str, Any]]
performance_notes: List[str] = field(default_factory=list)
limitations: List[str] = field(default_factory=list)
class ClickHouseComplexTypeManager:
"""ClickHouse复合数据类型管理器"""
def __init__(self):
self.complex_types = self._initialize_complex_types()
def _initialize_complex_types(self) -> Dict[str, ComplexTypeDefinition]:
"""初始化复合数据类型定义"""
types = {}
# 数组类型
array_type = ComplexTypeDefinition(
name="Array",
category=ComplexTypeCategory.ARRAY,
syntax="Array(T)",
description="一维数组,存储相同类型的多个值",
use_cases=[
"存储标签列表",
"存储多个ID",
"存储时间序列数据点",
"存储用户行为序列"
],
examples=[
{
"definition": "Array(String)",
"values": ["['tag1', 'tag2', 'tag3']", "['red', 'blue']", "[]"]
},
{
"definition": "Array(UInt32)",
"values": ["[1, 2, 3, 4, 5]", "[100, 200]", "[]"]
},
{
"definition": "Array(Float64)",
"values": ["[3.14, 2.71, 1.41]", "[0.5, 1.5, 2.5]"]
}
],
performance_notes=[
"数组元素在内存中连续存储",
"支持向量化操作",
"查询时可以使用数组函数进行高效处理"
],
limitations=[
"不支持嵌套数组(Array(Array(T)))",
"数组大小有限制",
"所有元素必须是相同类型"
]
)
# 元组类型
tuple_type = ComplexTypeDefinition(
name="Tuple",
category=ComplexTypeCategory.TUPLE,
syntax="Tuple(T1, T2, ...)",
description="存储固定数量的不同类型值",
use_cases=[
"存储坐标点(x, y)",
"存储键值对",
"存储复合标识符",
"存储多维度指标"
],
examples=[
{
"definition": "Tuple(String, UInt32)",
"values": ["('user123', 25)", "('admin', 1)"]
},
{
"definition": "Tuple(Float64, Float64)",
"values": ["(39.9042, 116.4074)", "(40.7128, -74.0060)"]
},
{
"definition": "Tuple(String, UInt32, DateTime)",
"values": ["('order', 12345, '2024-01-01 12:00:00')"]
}
],
performance_notes=[
"元组元素紧密打包存储",
"访问特定元素效率高",
"支持元组解构"
],
limitations=[
"元素数量和类型在创建时固定",
"不支持动态添加或删除元素"
]
)
# Map类型
map_type = ComplexTypeDefinition(
name="Map",
category=ComplexTypeCategory.MAP,
syntax="Map(K, V)",
description="键值对映射,类似于字典",
use_cases=[
"存储用户属性",
"存储配置参数",
"存储动态字段",
"存储标签和值的映射"
],
examples=[
{
"definition": "Map(String, String)",
"values": ["{'name': 'John', 'city': 'Beijing'}", "{'color': 'red', 'size': 'large'}"]
},
{
"definition": "Map(String, UInt32)",
"values": ["{'age': 25, 'score': 95}", "{'count': 100, 'total': 1000}"]
}
],
performance_notes=[
"键值对以数组形式存储",
"支持高效的键查找",
"压缩效果好"
],
limitations=[
"键的类型必须支持比较",
"不保证键的顺序",
"键必须唯一"
]
)
# Nested类型
nested_type = ComplexTypeDefinition(
name="Nested",
category=ComplexTypeCategory.NESTED,
syntax="Nested(col1 Type1, col2 Type2, ...)",
description="嵌套表结构,存储结构化的子记录",
use_cases=[
"存储订单商品列表",
"存储用户事件详情",
"存储层级数据",
"存储一对多关系数据"
],
examples=[
{
"definition": "Nested(product_id UInt32, quantity UInt16, price Float64)",
"values": ["[(1001, 2, 99.99), (1002, 1, 149.99)]"]
},
{
"definition": "Nested(event_type String, timestamp DateTime, value Float64)",
"values": ["[('click', '2024-01-01 12:00:00', 1.0), ('view', '2024-01-01 12:01:00', 0.5)]"]
}
],
performance_notes=[
"列式存储嵌套数据",
"支持高效的嵌套查询",
"压缩效果优秀"
],
limitations=[
"嵌套层级有限制",
"查询语法相对复杂",
"不支持嵌套的嵌套"
]
)
# Nullable类型
nullable_type = ComplexTypeDefinition(
name="Nullable",
category=ComplexTypeCategory.NULLABLE,
syntax="Nullable(T)",
description="允许NULL值的类型包装器",
use_cases=[
"可选字段",
"数据清洗场景",
"外部数据导入",
"兼容性处理"
],
examples=[
{
"definition": "Nullable(String)",
"values": ["'hello'", "NULL", "'world'"]
},
{
"definition": "Nullable(UInt32)",
"values": ["123", "NULL", "456"]
}
],
performance_notes=[
"额外存储NULL标记位",
"查询时需要NULL检查",
"影响压缩效果"
],
limitations=[
"性能开销较大",
"不建议在高性能场景使用",
"增加存储空间"
]
)
# LowCardinality类型
low_cardinality_type = ComplexTypeDefinition(
name="LowCardinality",
category=ComplexTypeCategory.LOW_CARDINALITY,
syntax="LowCardinality(T)",
description="低基数类型,适合重复值较多的列",
use_cases=[
"状态字段",
"分类字段",
"枚举值",
"国家/地区代码"
],
examples=[
{
"definition": "LowCardinality(String)",
"values": ["'active'", "'inactive'", "'pending'", "'active'"]
},
{
"definition": "LowCardinality(FixedString(2))",
"values": ["'CN'", "'US'", "'JP'", "'CN'"]
}
],
performance_notes=[
"使用字典编码压缩",
"查询性能优异",
"内存使用效率高"
],
limitations=[
"只适合低基数数据(<10000个不同值)",
"字典大小有限制",
"不适合高基数数据"
]
)
# Decimal类型
decimal_type = ComplexTypeDefinition(
name="Decimal",
category=ComplexTypeCategory.DECIMAL,
syntax="Decimal(P, S) 或 Decimal32(S), Decimal64(S), Decimal128(S)",
description="定点数类型,用于精确的数值计算",
use_cases=[
"货币金额",
"财务计算",
"精确的数值存储",
"科学计算"
],
examples=[
{
"definition": "Decimal(10, 2)",
"values": ["123.45", "999.99", "0.01"]
},
{
"definition": "Decimal64(4)",
"values": ["1234.5678", "0.0001", "9999.9999"]
}
],
performance_notes=[
"避免浮点数精度问题",
"计算结果精确",
"支持高精度运算"
],
limitations=[
"精度和范围有限制",
"计算性能略低于浮点数",
"存储空间较大"
]
)
# Enum类型
enum_type = ComplexTypeDefinition(
name="Enum",
category=ComplexTypeCategory.ENUM,
syntax="Enum8('name1' = value1, 'name2' = value2, ...) 或 Enum16(...)",
description="枚举类型,预定义的命名常量集合",
use_cases=[
"状态字段",
"类型分类",
"选项列表",
"固定值集合"
],
examples=[
{
"definition": "Enum8('pending' = 1, 'processing' = 2, 'completed' = 3, 'failed' = 4)",
"values": ["'pending'", "'completed'", "'failed'"]
},
{
"definition": "Enum16('small' = 100, 'medium' = 200, 'large' = 300, 'xlarge' = 400)",
"values": ["'small'", "'large'", "'medium'"]
}
],
performance_notes=[
"内部存储为整数",
"查询性能优秀",
"内存使用效率高"
],
limitations=[
"值集合在创建时固定",
"修改枚举值需要ALTER TABLE",
"Enum8最多256个值,Enum16最多65536个值"
]
)
# 添加所有类型到字典
all_types = [
array_type, tuple_type, map_type, nested_type,
nullable_type, low_cardinality_type, decimal_type, enum_type
]
for complex_type in all_types:
types[complex_type.name] = complex_type
return types
def get_complex_type_info(self, type_name: str) -> Optional[ComplexTypeDefinition]:
"""获取复合类型信息"""
return self.complex_types.get(type_name)
def list_types_by_category(self, category: ComplexTypeCategory) -> List[ComplexTypeDefinition]:
"""按分类列出复合类型"""
return [ct for ct in self.complex_types.values() if ct.category == category]
def get_type_recommendations_for_scenario(self, scenario: str) -> List[Dict[str, Any]]:
"""根据场景推荐复合类型"""
recommendations = []
scenario_lower = scenario.lower()
if "标签" in scenario_lower or "tag" in scenario_lower:
recommendations.append({
"type": "Array(String)",
"reason": "适合存储可变数量的标签",
"example": "['tech', 'news', 'ai']"
})
recommendations.append({
"type": "LowCardinality(String)",
"reason": "如果标签种类有限,可以优化存储和查询性能",
"example": "'technology'"
})
if "坐标" in scenario_lower or "位置" in scenario_lower or "coordinate" in scenario_lower:
recommendations.append({
"type": "Tuple(Float64, Float64)",
"reason": "适合存储经纬度坐标",
"example": "(39.9042, 116.4074)"
})
if "属性" in scenario_lower or "配置" in scenario_lower or "property" in scenario_lower:
recommendations.append({
"type": "Map(String, String)",
"reason": "适合存储动态的键值对属性",
"example": "{'color': 'red', 'size': 'large'}"
})
if "订单" in scenario_lower or "商品" in scenario_lower or "order" in scenario_lower:
recommendations.append({
"type": "Nested(product_id UInt32, quantity UInt16, price Decimal(10,2))",
"reason": "适合存储订单中的商品列表",
"example": "[(1001, 2, 99.99), (1002, 1, 149.99)]"
})
if "金额" in scenario_lower or "价格" in scenario_lower or "money" in scenario_lower:
recommendations.append({
"type": "Decimal(10, 2)",
"reason": "适合精确的货币计算",
"example": "123.45"
})
if "状态" in scenario_lower or "类型" in scenario_lower or "status" in scenario_lower:
recommendations.append({
"type": "Enum8('pending' = 1, 'active' = 2, 'inactive' = 3)",
"reason": "适合预定义的状态值",
"example": "'active'"
})
recommendations.append({
"type": "LowCardinality(String)",
"reason": "如果状态值较少且重复度高",
"example": "'active'"
})
return recommendations
def analyze_type_performance(self, type_name: str, data_characteristics: Dict[str, Any]) -> Dict[str, Any]:
"""分析类型性能特征"""
complex_type = self.get_complex_type_info(type_name)
if not complex_type:
return {"error": "未知的复合类型"}
analysis = {
"type_name": type_name,
"category": complex_type.category.value,
"performance_score": 0,
"storage_efficiency": "unknown",
"query_performance": "unknown",
"recommendations": []
}
# 根据数据特征分析性能
cardinality = data_characteristics.get("cardinality", "unknown")
data_size = data_characteristics.get("data_size", "unknown")
query_pattern = data_characteristics.get("query_pattern", "unknown")
if type_name == "LowCardinality":
if isinstance(cardinality, int) and cardinality < 10000:
analysis["performance_score"] = 9
analysis["storage_efficiency"] = "excellent"
analysis["query_performance"] = "excellent"
else:
analysis["performance_score"] = 3
analysis["storage_efficiency"] = "poor"
analysis["recommendations"].append("基数过高,建议使用普通String类型")
elif type_name == "Array":
if query_pattern == "element_access":
analysis["performance_score"] = 8
analysis["query_performance"] = "good"
elif query_pattern == "aggregation":
analysis["performance_score"] = 9
analysis["query_performance"] = "excellent"
else:
analysis["performance_score"] = 7
elif type_name == "Nested":
if query_pattern == "nested_query":
analysis["performance_score"] = 8
analysis["query_performance"] = "good"
analysis["storage_efficiency"] = "excellent"
else:
analysis["performance_score"] = 6
analysis["recommendations"].append("考虑是否真的需要嵌套结构")
elif type_name == "Nullable":
analysis["performance_score"] = 5
analysis["storage_efficiency"] = "fair"
analysis["query_performance"] = "fair"
analysis["recommendations"].append("尽量避免使用Nullable,考虑使用默认值")
return analysis
def generate_type_usage_examples(self, type_name: str) -> List[Dict[str, str]]:
"""生成类型使用示例"""
complex_type = self.get_complex_type_info(type_name)
if not complex_type:
return []
usage_examples = []
for example in complex_type.examples:
for value in example["values"]:
usage_examples.append({
"create_table": f"CREATE TABLE example_table (\n id UInt32,\n data {example['definition']}\n) ENGINE = MergeTree() ORDER BY id;",
"insert_data": f"INSERT INTO example_table VALUES (1, {value});",
"query_data": f"SELECT * FROM example_table WHERE data = {value};"
})
return usage_examples[:3] # 返回前3个示例
# 复合数据类型管理示例
print("\n=== ClickHouse复合数据类型管理 ===")
complex_type_manager = ClickHouseComplexTypeManager()
print("\n1. 复合类型概览:")
for category in ComplexTypeCategory:
types = complex_type_manager.list_types_by_category(category)
if types:
print(f"\n {category.value.upper()}类型:")
for ct in types:
print(f" - {ct.name}: {ct.description}")
print("\n2. 详细类型信息:")
detail_types = ["Array", "Tuple", "Map", "LowCardinality"]
for type_name in detail_types:
type_info = complex_type_manager.get_complex_type_info(type_name)
if type_info:
print(f"\n {type_name}:")
print(f" 语法: {type_info.syntax}")
print(f" 描述: {type_info.description}")
print(f" 适用场景: {', '.join(type_info.use_cases[:2])}")
if type_info.examples:
print(f" 示例: {type_info.examples[0]['definition']}")
print("\n3. 场景推荐:")
scenarios = ["用户标签", "地理坐标", "订单商品", "商品价格", "订单状态"]
for scenario in scenarios:
recommendations = complex_type_manager.get_type_recommendations_for_scenario(scenario)
print(f"\n {scenario}:")
for rec in recommendations[:2]: # 只显示前2个推荐
print(f" - {rec['type']}: {rec['reason']}")
print("\n4. 性能分析:")
performance_tests = [
("LowCardinality", {"cardinality": 100, "data_size": "large"}),
("Array", {"query_pattern": "aggregation", "data_size": "medium"}),
("Nullable", {"null_ratio": 0.3, "data_size": "large"})
]
for type_name, characteristics in performance_tests:
analysis = complex_type_manager.analyze_type_performance(type_name, characteristics)
if "error" not in analysis:
print(f"\n {type_name}:")
print(f" 性能评分: {analysis['performance_score']}/10")
print(f" 存储效率: {analysis['storage_efficiency']}")
print(f" 查询性能: {analysis['query_performance']}")
if analysis['recommendations']:
print(f" 建议: {analysis['recommendations'][0]}")
print("\n5. 使用示例:")
for type_name in ["Array", "Tuple"]:
examples = complex_type_manager.generate_type_usage_examples(type_name)
if examples:
print(f"\n {type_name} 使用示例:")
example = examples[0]
print(f" 创建表: {example['create_table'].split('ENGINE')[0].strip()}...")
print(f" 插入数据: {example['insert_data']}")
总结
本章详细介绍了 ClickHouse 的基础概念,包括:
关键要点
数据库和表管理
- 数据库是表的逻辑容器
- 支持多种表引擎
- 表的生命周期管理
- 分区管理机制
数据类型系统
- 丰富的基础数据类型
- 强大的复合数据类型
- 类型选择对性能的影响
- 场景化的类型推荐
复合数据类型特点
- Array:适合存储列表数据
- Tuple:适合存储结构化数据
- Map:适合存储键值对
- Nested:适合存储嵌套结构
- LowCardinality:优化低基数数据
- Decimal:精确数值计算
- Enum:预定义枚举值
最佳实践
类型选择原则
- 根据数据特征选择合适的类型
- 考虑查询模式和性能需求
- 避免过度使用 Nullable 类型
- 合理使用 LowCardinality 优化
表设计建议
- 合理设计分区策略
- 选择合适的排序键
- 考虑数据压缩需求
- 规划表的生命周期
性能优化
- 了解各类型的性能特征
- 根据查询模式优化数据结构
- 监控和分析类型使用效果
下一步学习
- SQL基础操作:学习 ClickHouse 的 SQL 语法
- 表引擎详解:深入了解各种表引擎特性
- 查询优化:掌握查询性能优化技巧
- 实际应用:在项目中应用所学概念 “`