概述
本章将详细介绍 ClickHouse 的 SQL 基础操作,包括数据定义语言(DDL)、数据操作语言(DML)和查询语言(DQL)。ClickHouse 支持标准 SQL 语法,同时提供了许多针对分析场景优化的扩展功能。
DDL(数据定义语言)操作
数据库操作
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from datetime import datetime
import json
class DatabaseEngine(Enum):
"""数据库引擎类型"""
ORDINARY = "Ordinary"
MEMORY = "Memory"
LAZY = "Lazy"
ATOMIC = "Atomic"
MYSQL = "MySQL"
POSTGRESQL = "PostgreSQL"
REPLICATED = "Replicated"
class DDLOperationType(Enum):
"""DDL操作类型"""
CREATE_DATABASE = "create_database"
DROP_DATABASE = "drop_database"
CREATE_TABLE = "create_table"
DROP_TABLE = "drop_table"
ALTER_TABLE = "alter_table"
RENAME_TABLE = "rename_table"
TRUNCATE_TABLE = "truncate_table"
@dataclass
class DatabaseDefinition:
"""数据库定义"""
name: str
engine: DatabaseEngine
comment: str = ""
settings: Dict[str, Any] = None
def __post_init__(self):
if self.settings is None:
self.settings = {}
@dataclass
class ColumnDefinition:
"""列定义"""
name: str
data_type: str
nullable: bool = False
default_value: Any = None
comment: str = ""
codec: str = ""
@dataclass
class TableDefinition:
"""表定义"""
name: str
database: str
columns: List[ColumnDefinition]
engine: str
order_by: List[str]
partition_by: List[str] = None
primary_key: List[str] = None
sample_by: str = ""
settings: Dict[str, Any] = None
comment: str = ""
def __post_init__(self):
if self.partition_by is None:
self.partition_by = []
if self.primary_key is None:
self.primary_key = []
if self.settings is None:
self.settings = {}
@dataclass
class DDLOperation:
"""DDL操作记录"""
operation_type: DDLOperationType
target_name: str
sql_statement: str
execution_time: datetime
success: bool
error_message: str = ""
affected_objects: List[str] = None
def __post_init__(self):
if self.affected_objects is None:
self.affected_objects = []
class ClickHouseDDLManager:
"""ClickHouse DDL操作管理器"""
def __init__(self):
self.databases: Dict[str, DatabaseDefinition] = {}
self.tables: Dict[str, TableDefinition] = {}
self.operation_history: List[DDLOperation] = []
self._initialize_system_databases()
def _initialize_system_databases(self):
"""初始化系统数据库"""
system_databases = [
DatabaseDefinition("system", DatabaseEngine.ORDINARY, "系统数据库"),
DatabaseDefinition("information_schema", DatabaseEngine.MEMORY, "信息模式"),
DatabaseDefinition("INFORMATION_SCHEMA", DatabaseEngine.MEMORY, "信息模式(大写)")
]
for db in system_databases:
self.databases[db.name] = db
def create_database(self, db_def: DatabaseDefinition, if_not_exists: bool = True) -> Dict[str, Any]:
"""创建数据库"""
if db_def.name in self.databases and not if_not_exists:
return {
"success": False,
"error": f"Database {db_def.name} already exists",
"sql": ""
}
# 构建SQL语句
sql_parts = ["CREATE DATABASE"]
if if_not_exists:
sql_parts.append("IF NOT EXISTS")
sql_parts.append(f"`{db_def.name}`")
if db_def.engine != DatabaseEngine.ORDINARY:
sql_parts.append(f"ENGINE = {db_def.engine.value}")
if db_def.comment:
sql_parts.append(f"COMMENT '{db_def.comment}'")
sql = " ".join(sql_parts) + ";"
# 记录操作
operation = DDLOperation(
operation_type=DDLOperationType.CREATE_DATABASE,
target_name=db_def.name,
sql_statement=sql,
execution_time=datetime.now(),
success=True
)
self.databases[db_def.name] = db_def
self.operation_history.append(operation)
return {
"success": True,
"sql": sql,
"database": db_def.name
}
def drop_database(self, database_name: str, if_exists: bool = True) -> Dict[str, Any]:
"""删除数据库"""
if database_name not in self.databases and not if_exists:
return {
"success": False,
"error": f"Database {database_name} does not exist",
"sql": ""
}
# 构建SQL语句
sql_parts = ["DROP DATABASE"]
if if_exists:
sql_parts.append("IF EXISTS")
sql_parts.append(f"`{database_name}`")
sql = " ".join(sql_parts) + ";"
# 记录操作
affected_tables = [table_name for table_name, table_def in self.tables.items()
if table_def.database == database_name]
operation = DDLOperation(
operation_type=DDLOperationType.DROP_DATABASE,
target_name=database_name,
sql_statement=sql,
execution_time=datetime.now(),
success=True,
affected_objects=affected_tables
)
# 删除数据库及其表
if database_name in self.databases:
del self.databases[database_name]
# 删除该数据库下的所有表
tables_to_remove = [table_name for table_name, table_def in self.tables.items()
if table_def.database == database_name]
for table_name in tables_to_remove:
del self.tables[table_name]
self.operation_history.append(operation)
return {
"success": True,
"sql": sql,
"affected_tables": affected_tables
}
def create_table(self, table_def: TableDefinition, if_not_exists: bool = True) -> Dict[str, Any]:
"""创建表"""
table_full_name = f"{table_def.database}.{table_def.name}"
if table_full_name in self.tables and not if_not_exists:
return {
"success": False,
"error": f"Table {table_full_name} already exists",
"sql": ""
}
# 构建SQL语句
sql_parts = ["CREATE TABLE"]
if if_not_exists:
sql_parts.append("IF NOT EXISTS")
sql_parts.append(f"`{table_def.database}`.`{table_def.name}`")
# 列定义
column_definitions = []
for col in table_def.columns:
col_def = f"`{col.name}` {col.data_type}"
if col.nullable:
col_def += " NULL"
if col.default_value is not None:
if isinstance(col.default_value, str):
col_def += f" DEFAULT '{col.default_value}'"
else:
col_def += f" DEFAULT {col.default_value}"
if col.comment:
col_def += f" COMMENT '{col.comment}'"
if col.codec:
col_def += f" CODEC({col.codec})"
column_definitions.append(col_def)
sql_parts.append(f"(\n {',\n '.join(column_definitions)}\n)")
# 引擎
sql_parts.append(f"ENGINE = {table_def.engine}")
# ORDER BY
if table_def.order_by:
sql_parts.append(f"ORDER BY ({', '.join(table_def.order_by)})")
# PARTITION BY
if table_def.partition_by:
sql_parts.append(f"PARTITION BY ({', '.join(table_def.partition_by)})")
# PRIMARY KEY
if table_def.primary_key:
sql_parts.append(f"PRIMARY KEY ({', '.join(table_def.primary_key)})")
# SAMPLE BY
if table_def.sample_by:
sql_parts.append(f"SAMPLE BY {table_def.sample_by}")
# SETTINGS
if table_def.settings:
settings_str = ", ".join([f"{k} = {v}" for k, v in table_def.settings.items()])
sql_parts.append(f"SETTINGS {settings_str}")
# COMMENT
if table_def.comment:
sql_parts.append(f"COMMENT '{table_def.comment}'")
sql = " ".join(sql_parts) + ";"
# 记录操作
operation = DDLOperation(
operation_type=DDLOperationType.CREATE_TABLE,
target_name=table_full_name,
sql_statement=sql,
execution_time=datetime.now(),
success=True
)
self.tables[table_full_name] = table_def
self.operation_history.append(operation)
return {
"success": True,
"sql": sql,
"table": table_full_name
}
def drop_table(self, database: str, table: str, if_exists: bool = True) -> Dict[str, Any]:
"""删除表"""
table_full_name = f"{database}.{table}"
if table_full_name not in self.tables and not if_exists:
return {
"success": False,
"error": f"Table {table_full_name} does not exist",
"sql": ""
}
# 构建SQL语句
sql_parts = ["DROP TABLE"]
if if_exists:
sql_parts.append("IF EXISTS")
sql_parts.append(f"`{database}`.`{table}`")
sql = " ".join(sql_parts) + ";"
# 记录操作
operation = DDLOperation(
operation_type=DDLOperationType.DROP_TABLE,
target_name=table_full_name,
sql_statement=sql,
execution_time=datetime.now(),
success=True
)
if table_full_name in self.tables:
del self.tables[table_full_name]
self.operation_history.append(operation)
return {
"success": True,
"sql": sql,
"table": table_full_name
}
def alter_table_add_column(self, database: str, table: str, column: ColumnDefinition, after_column: str = "") -> Dict[str, Any]:
"""添加列"""
table_full_name = f"{database}.{table}"
if table_full_name not in self.tables:
return {
"success": False,
"error": f"Table {table_full_name} does not exist",
"sql": ""
}
# 构建SQL语句
sql_parts = [f"ALTER TABLE `{database}`.`{table}` ADD COLUMN"]
col_def = f"`{column.name}` {column.data_type}"
if column.nullable:
col_def += " NULL"
if column.default_value is not None:
if isinstance(column.default_value, str):
col_def += f" DEFAULT '{column.default_value}'"
else:
col_def += f" DEFAULT {column.default_value}"
if column.comment:
col_def += f" COMMENT '{column.comment}'"
sql_parts.append(col_def)
if after_column:
sql_parts.append(f"AFTER `{after_column}`")
sql = " ".join(sql_parts) + ";"
# 更新表定义
table_def = self.tables[table_full_name]
if after_column:
# 找到指定列的位置
insert_index = len(table_def.columns)
for i, col in enumerate(table_def.columns):
if col.name == after_column:
insert_index = i + 1
break
table_def.columns.insert(insert_index, column)
else:
table_def.columns.append(column)
# 记录操作
operation = DDLOperation(
operation_type=DDLOperationType.ALTER_TABLE,
target_name=table_full_name,
sql_statement=sql,
execution_time=datetime.now(),
success=True
)
self.operation_history.append(operation)
return {
"success": True,
"sql": sql,
"table": table_full_name,
"column_added": column.name
}
def alter_table_drop_column(self, database: str, table: str, column_name: str) -> Dict[str, Any]:
"""删除列"""
table_full_name = f"{database}.{table}"
if table_full_name not in self.tables:
return {
"success": False,
"error": f"Table {table_full_name} does not exist",
"sql": ""
}
# 构建SQL语句
sql = f"ALTER TABLE `{database}`.`{table}` DROP COLUMN `{column_name}`;"
# 更新表定义
table_def = self.tables[table_full_name]
table_def.columns = [col for col in table_def.columns if col.name != column_name]
# 记录操作
operation = DDLOperation(
operation_type=DDLOperationType.ALTER_TABLE,
target_name=table_full_name,
sql_statement=sql,
execution_time=datetime.now(),
success=True
)
self.operation_history.append(operation)
return {
"success": True,
"sql": sql,
"table": table_full_name,
"column_dropped": column_name
}
def get_database_list(self) -> List[Dict[str, Any]]:
"""获取数据库列表"""
return [
{
"name": db.name,
"engine": db.engine.value,
"comment": db.comment,
"table_count": len([t for t in self.tables.values() if t.database == db.name])
}
for db in self.databases.values()
]
def get_table_list(self, database: str = "") -> List[Dict[str, Any]]:
"""获取表列表"""
tables = self.tables.values()
if database:
tables = [t for t in tables if t.database == database]
return [
{
"database": table.database,
"name": table.name,
"engine": table.engine,
"column_count": len(table.columns),
"comment": table.comment
}
for table in tables
]
def get_table_schema(self, database: str, table: str) -> Dict[str, Any]:
"""获取表结构"""
table_full_name = f"{database}.{table}"
if table_full_name not in self.tables:
return {"error": f"Table {table_full_name} does not exist"}
table_def = self.tables[table_full_name]
return {
"database": table_def.database,
"table": table_def.name,
"engine": table_def.engine,
"columns": [
{
"name": col.name,
"type": col.data_type,
"nullable": col.nullable,
"default": col.default_value,
"comment": col.comment,
"codec": col.codec
}
for col in table_def.columns
],
"order_by": table_def.order_by,
"partition_by": table_def.partition_by,
"primary_key": table_def.primary_key,
"sample_by": table_def.sample_by,
"settings": table_def.settings,
"comment": table_def.comment
}
def get_operation_history(self, limit: int = 10) -> List[Dict[str, Any]]:
"""获取操作历史"""
recent_operations = self.operation_history[-limit:] if limit > 0 else self.operation_history
return [
{
"operation_type": op.operation_type.value,
"target_name": op.target_name,
"sql_statement": op.sql_statement,
"execution_time": op.execution_time.isoformat(),
"success": op.success,
"error_message": op.error_message,
"affected_objects": op.affected_objects
}
for op in reversed(recent_operations)
]
def generate_create_table_template(self, table_name: str, database: str = "default") -> str:
"""生成创建表模板"""
template = f"""CREATE TABLE IF NOT EXISTS `{database}`.`{table_name}` (
`id` UInt64 COMMENT '主键ID',
`name` String COMMENT '名称',
`status` Enum8('active' = 1, 'inactive' = 0) DEFAULT 'active' COMMENT '状态',
`created_at` DateTime DEFAULT now() COMMENT '创建时间',
`updated_at` DateTime DEFAULT now() COMMENT '更新时间'
) ENGINE = MergeTree()
ORDER BY (id)
PARTITION BY toYYYYMM(created_at)
SETTINGS index_granularity = 8192
COMMENT '示例表';"""
return template
# DDL操作示例
print("\n=== ClickHouse DDL操作管理 ===")
ddl_manager = ClickHouseDDLManager()
print("\n1. 数据库操作:")
# 创建数据库
test_db = DatabaseDefinition(
name="analytics",
engine=DatabaseEngine.ATOMIC,
comment="分析数据库"
)
result = ddl_manager.create_database(test_db)
print(f"\n 创建数据库:")
print(f" SQL: {result['sql']}")
print(f" 成功: {result['success']}")
print("\n 数据库列表:")
for db in ddl_manager.get_database_list():
print(f" - {db['name']} ({db['engine']}) - {db['table_count']} 个表")
print("\n2. 表操作:")
# 创建表
user_table = TableDefinition(
name="users",
database="analytics",
columns=[
ColumnDefinition("id", "UInt64", comment="用户ID"),
ColumnDefinition("username", "String", comment="用户名"),
ColumnDefinition("email", "String", comment="邮箱"),
ColumnDefinition("age", "UInt8", nullable=True, comment="年龄"),
ColumnDefinition("created_at", "DateTime", default_value="now()", comment="创建时间"),
ColumnDefinition("status", "Enum8('active' = 1, 'inactive' = 0)", default_value="'active'", comment="状态")
],
engine="MergeTree()",
order_by=["id"],
partition_by=["toYYYYMM(created_at)"],
settings={"index_granularity": 8192},
comment="用户表"
)
result = ddl_manager.create_table(user_table)
print(f"\n 创建表:")
print(f" 表名: {result['table']}")
print(f" 成功: {result['success']}")
print(f" SQL: {result['sql'][:100]}...")
print("\n3. 表结构修改:")
# 添加列
new_column = ColumnDefinition(
name="last_login",
data_type="DateTime",
nullable=True,
comment="最后登录时间"
)
result = ddl_manager.alter_table_add_column("analytics", "users", new_column)
print(f"\n 添加列:")
print(f" SQL: {result['sql']}")
print(f" 成功: {result['success']}")
print("\n4. 表结构查看:")
schema = ddl_manager.get_table_schema("analytics", "users")
print(f"\n 表: {schema['database']}.{schema['table']}")
print(f" 引擎: {schema['engine']}")
print(f" 列数: {len(schema['columns'])}")
print(" 列信息:")
for col in schema['columns']:
nullable_str = " (可空)" if col['nullable'] else ""
default_str = f" 默认:{col['default']}" if col['default'] else ""
print(f" - {col['name']}: {col['type']}{nullable_str}{default_str} - {col['comment']}")
print("\n5. 操作历史:")
history = ddl_manager.get_operation_history(5)
for i, op in enumerate(history, 1):
print(f"\n {i}. {op['operation_type']} - {op['target_name']}")
print(f" 时间: {op['execution_time']}")
print(f" 成功: {op['success']}")
print(f" SQL: {op['sql_statement'][:80]}...")
print("\n6. 表模板生成:")
template = ddl_manager.generate_create_table_template("example_table", "analytics")
print(f"\n 模板SQL:")
print(f" {template[:200]}...")
DML(数据操作语言)操作
数据插入操作
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Union
from datetime import datetime, date
import json
import random
class InsertFormat(Enum):
"""插入数据格式"""
VALUES = "VALUES"
SELECT = "SELECT"
CSV = "CSV"
JSON = "JSON"
TSV = "TSV"
PARQUET = "Parquet"
class DataSource(Enum):
"""数据源类型"""
MANUAL = "manual"
FILE = "file"
URL = "url"
TABLE = "table"
FUNCTION = "function"
@dataclass
class InsertOperation:
"""插入操作定义"""
table_name: str
database: str
format_type: InsertFormat
data_source: DataSource
columns: List[str] = None
data: Any = None
source_path: str = ""
settings: Dict[str, Any] = None
def __post_init__(self):
if self.columns is None:
self.columns = []
if self.settings is None:
self.settings = {}
@dataclass
class BatchInsertConfig:
"""批量插入配置"""
batch_size: int = 1000
max_memory_usage: int = 10000000000 # 10GB
max_execution_time: int = 3600 # 1小时
async_insert: bool = False
wait_for_async_insert: bool = True
deduplicate: bool = True
class ClickHouseDMLManager:
"""ClickHouse DML操作管理器"""
def __init__(self):
self.insert_history: List[Dict[str, Any]] = []
self.data_generators = self._initialize_data_generators()
def _initialize_data_generators(self) -> Dict[str, Any]:
"""初始化数据生成器"""
return {
"user_names": ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry"],
"domains": ["gmail.com", "yahoo.com", "hotmail.com", "outlook.com", "company.com"],
"statuses": ["active", "inactive"],
"countries": ["CN", "US", "JP", "DE", "FR", "GB", "CA", "AU"]
}
def generate_insert_values(self, table_schema: Dict[str, Any], row_count: int = 10) -> List[Dict[str, Any]]:
"""生成插入数据"""
rows = []
for i in range(row_count):
row = {}
for col in table_schema['columns']:
col_name = col['name']
col_type = col['type'].lower()
if 'id' in col_name.lower():
row[col_name] = i + 1
elif 'name' in col_name.lower():
row[col_name] = f"'{random.choice(self.data_generators['user_names'])}{i}"
elif 'email' in col_name.lower():
username = f"user{i}"
domain = random.choice(self.data_generators['domains'])
row[col_name] = f"'{username}@{domain}'"
elif 'age' in col_name.lower():
row[col_name] = random.randint(18, 80) if not col['nullable'] or random.random() > 0.1 else 'NULL'
elif 'status' in col_name.lower():
row[col_name] = f"'{random.choice(self.data_generators['statuses'])}'"
elif 'country' in col_name.lower():
row[col_name] = f"'{random.choice(self.data_generators['countries'])}'"
elif 'created_at' in col_name.lower() or 'datetime' in col_type:
if col.get('default') == 'now()':
row[col_name] = 'now()'
else:
row[col_name] = f"'{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'"
elif 'date' in col_type:
row[col_name] = f"'{date.today().strftime('%Y-%m-%d')}'"
elif col_type.startswith('uint') or col_type.startswith('int'):
row[col_name] = random.randint(1, 1000)
elif col_type.startswith('float') or col_type.startswith('decimal'):
row[col_name] = round(random.uniform(1.0, 1000.0), 2)
elif col_type == 'string':
row[col_name] = f"'sample_text_{i}'"
else:
row[col_name] = f"'default_value_{i}'"
rows.append(row)
return rows
def create_insert_statement(self, operation: InsertOperation, data: List[Dict[str, Any]] = None) -> str:
"""创建插入语句"""
table_ref = f"`{operation.database}`.`{operation.table_name}`"
if operation.format_type == InsertFormat.VALUES:
sql_parts = [f"INSERT INTO {table_ref}"]
if operation.columns:
columns_str = ", ".join([f"`{col}`" for col in operation.columns])
sql_parts.append(f"({columns_str})")
sql_parts.append("VALUES")
if data:
value_rows = []
for row in data:
if operation.columns:
values = [str(row.get(col, 'NULL')) for col in operation.columns]
else:
values = [str(v) for v in row.values()]
value_rows.append(f"({', '.join(values)})")
sql_parts.append(",\n ".join(value_rows))
else:
sql_parts.append("(...)")
elif operation.format_type == InsertFormat.SELECT:
sql_parts = [f"INSERT INTO {table_ref}"]
if operation.columns:
columns_str = ", ".join([f"`{col}`" for col in operation.columns])
sql_parts.append(f"({columns_str})")
if operation.source_path:
sql_parts.append(f"SELECT * FROM {operation.source_path}")
else:
sql_parts.append("SELECT ...")
elif operation.format_type in [InsertFormat.CSV, InsertFormat.JSON, InsertFormat.TSV]:
sql_parts = [f"INSERT INTO {table_ref}"]
sql_parts.append(f"FROM INFILE '{operation.source_path}'")
sql_parts.append(f"FORMAT {operation.format_type.value}")
# 添加设置
if operation.settings:
settings_str = ", ".join([f"{k} = {v}" for k, v in operation.settings.items()])
sql_parts.append(f"SETTINGS {settings_str}")
return " ".join(sql_parts) + ";"
def create_batch_insert(self, operation: InsertOperation, batch_config: BatchInsertConfig,
total_rows: int = 10000) -> List[str]:
"""创建批量插入语句"""
statements = []
# 计算批次数量
batch_count = (total_rows + batch_config.batch_size - 1) // batch_config.batch_size
for batch_num in range(batch_count):
start_row = batch_num * batch_config.batch_size
end_row = min(start_row + batch_config.batch_size, total_rows)
batch_size = end_row - start_row
# 生成批次数据(模拟)
batch_operation = InsertOperation(
table_name=operation.table_name,
database=operation.database,
format_type=operation.format_type,
data_source=operation.data_source,
columns=operation.columns,
settings={
**operation.settings,
"max_memory_usage": batch_config.max_memory_usage,
"max_execution_time": batch_config.max_execution_time
}
)
if batch_config.async_insert:
batch_operation.settings["async_insert"] = 1
batch_operation.settings["wait_for_async_insert"] = 1 if batch_config.wait_for_async_insert else 0
if batch_config.deduplicate:
batch_operation.settings["insert_deduplicate"] = 1
# 创建语句(简化版)
table_ref = f"`{operation.database}`.`{operation.table_name}`"
sql = f"INSERT INTO {table_ref} VALUES /* Batch {batch_num + 1}: {batch_size} rows */"
if batch_operation.settings:
settings_str = ", ".join([f"{k} = {v}" for k, v in batch_operation.settings.items()])
sql += f" SETTINGS {settings_str}"
sql += ";"
statements.append(sql)
return statements
def create_insert_from_file(self, database: str, table: str, file_path: str,
file_format: InsertFormat, columns: List[str] = None) -> str:
"""从文件插入数据"""
table_ref = f"`{database}`.`{table}`"
sql_parts = [f"INSERT INTO {table_ref}"]
if columns:
columns_str = ", ".join([f"`{col}`" for col in columns])
sql_parts.append(f"({columns_str})")
if file_format == InsertFormat.CSV:
sql_parts.extend([
f"FROM INFILE '{file_path}'",
"FORMAT CSV",
"SETTINGS input_format_csv_skip_first_lines = 1"
])
elif file_format == InsertFormat.JSON:
sql_parts.extend([
f"FROM INFILE '{file_path}'",
"FORMAT JSONEachRow"
])
elif file_format == InsertFormat.TSV:
sql_parts.extend([
f"FROM INFILE '{file_path}'",
"FORMAT TSV",
"SETTINGS input_format_tsv_skip_first_lines = 1"
])
elif file_format == InsertFormat.PARQUET:
sql_parts.extend([
f"FROM INFILE '{file_path}'",
"FORMAT Parquet"
])
return " ".join(sql_parts) + ";"
def create_insert_from_url(self, database: str, table: str, url: str,
file_format: InsertFormat, columns: List[str] = None) -> str:
"""从URL插入数据"""
table_ref = f"`{database}`.`{table}`"
sql_parts = [f"INSERT INTO {table_ref}"]
if columns:
columns_str = ", ".join([f"`{col}`" for col in columns])
sql_parts.append(f"({columns_str})")
sql_parts.append(f"SELECT * FROM url('{url}', '{file_format.value}')")
return " ".join(sql_parts) + ";"
def create_upsert_statement(self, database: str, table: str, data: List[Dict[str, Any]],
key_columns: List[str]) -> str:
"""创建UPSERT语句(使用ReplacingMergeTree)"""
table_ref = f"`{database}`.`{table}`"
# ClickHouse没有直接的UPSERT,使用INSERT + OPTIMIZE模拟
insert_sql = f"INSERT INTO {table_ref} VALUES"
value_rows = []
for row in data:
values = [str(v) for v in row.values()]
value_rows.append(f"({', '.join(values)})")
insert_sql += " " + ",\n ".join(value_rows) + ";"
# 添加优化建议
optimize_sql = f"OPTIMIZE TABLE {table_ref} FINAL;"
return f"{insert_sql}\n\n-- 优化表以应用去重\n{optimize_sql}"
def analyze_insert_performance(self, operation: InsertOperation, estimated_rows: int) -> Dict[str, Any]:
"""分析插入性能"""
analysis = {
"operation": f"{operation.database}.{operation.table_name}",
"format": operation.format_type.value,
"estimated_rows": estimated_rows,
"performance_score": 0,
"recommendations": [],
"estimated_time": "unknown",
"memory_usage": "unknown"
}
# 根据格式评估性能
if operation.format_type == InsertFormat.VALUES:
if estimated_rows < 1000:
analysis["performance_score"] = 8
analysis["estimated_time"] = "< 1秒"
elif estimated_rows < 100000:
analysis["performance_score"] = 6
analysis["estimated_time"] = "1-10秒"
analysis["recommendations"].append("考虑使用批量插入")
else:
analysis["performance_score"] = 3
analysis["estimated_time"] = "> 30秒"
analysis["recommendations"].append("建议使用文件导入或批量插入")
elif operation.format_type in [InsertFormat.CSV, InsertFormat.TSV, InsertFormat.PARQUET]:
analysis["performance_score"] = 9
analysis["estimated_time"] = "很快"
analysis["memory_usage"] = "低"
if operation.format_type == InsertFormat.PARQUET:
analysis["recommendations"].append("Parquet格式提供最佳性能")
elif operation.format_type == InsertFormat.JSON:
analysis["performance_score"] = 7
analysis["estimated_time"] = "中等"
analysis["recommendations"].append("JSON解析可能较慢,考虑使用JSONEachRow")
# 数据量评估
if estimated_rows > 1000000:
analysis["recommendations"].append("大数据量建议使用异步插入")
analysis["recommendations"].append("考虑分区策略优化")
return analysis
def generate_sample_data_sql(self, database: str, table: str, row_count: int = 1000) -> str:
"""生成示例数据插入SQL"""
return f"""-- 生成 {row_count} 行示例数据
INSERT INTO `{database}`.`{table}`
SELECT
number AS id,
concat('user_', toString(number)) AS username,
concat('user_', toString(number), '@example.com') AS email,
18 + (number % 50) AS age,
now() - INTERVAL (number % 365) DAY AS created_at,
if(number % 10 = 0, 'inactive', 'active') AS status
FROM numbers({row_count});"""
def get_insert_statistics(self) -> Dict[str, Any]:
"""获取插入统计信息"""
total_operations = len(self.insert_history)
if total_operations == 0:
return {
"total_operations": 0,
"success_rate": 0,
"average_rows_per_operation": 0,
"most_used_format": "none",
"total_rows_inserted": 0
}
successful_ops = sum(1 for op in self.insert_history if op.get('success', False))
total_rows = sum(op.get('rows_affected', 0) for op in self.insert_history)
format_counts = {}
for op in self.insert_history:
fmt = op.get('format', 'unknown')
format_counts[fmt] = format_counts.get(fmt, 0) + 1
most_used_format = max(format_counts.items(), key=lambda x: x[1])[0] if format_counts else "none"
return {
"total_operations": total_operations,
"success_rate": (successful_ops / total_operations) * 100,
"average_rows_per_operation": total_rows / total_operations if total_operations > 0 else 0,
"most_used_format": most_used_format,
"total_rows_inserted": total_rows,
"format_distribution": format_counts
}
# DML操作示例
print("\n=== ClickHouse DML操作管理 ===")
dml_manager = ClickHouseDMLManager()
# 模拟表结构
user_table_schema = {
'database': 'analytics',
'table': 'users',
'columns': [
{'name': 'id', 'type': 'UInt64', 'nullable': False},
{'name': 'username', 'type': 'String', 'nullable': False},
{'name': 'email', 'type': 'String', 'nullable': False},
{'name': 'age', 'type': 'UInt8', 'nullable': True},
{'name': 'created_at', 'type': 'DateTime', 'nullable': False, 'default': 'now()'},
{'name': 'status', 'type': 'Enum8', 'nullable': False}
]
}
print("\n1. 基本插入操作:")
# VALUES插入
values_operation = InsertOperation(
table_name="users",
database="analytics",
format_type=InsertFormat.VALUES,
data_source=DataSource.MANUAL,
columns=["id", "username", "email", "age", "status"]
)
# 生成示例数据
sample_data = dml_manager.generate_insert_values(user_table_schema, 5)
insert_sql = dml_manager.create_insert_statement(values_operation, sample_data)
print(f"\n VALUES插入:")
print(f" SQL: {insert_sql[:150]}...")
print("\n2. 文件导入操作:")
# CSV文件导入
csv_sql = dml_manager.create_insert_from_file(
"analytics", "users", "/data/users.csv", InsertFormat.CSV,
["username", "email", "age", "status"]
)
print(f"\n CSV导入:")
print(f" SQL: {csv_sql}")
# JSON文件导入
json_sql = dml_manager.create_insert_from_file(
"analytics", "users", "/data/users.json", InsertFormat.JSON
)
print(f"\n JSON导入:")
print(f" SQL: {json_sql}")
print("\n3. URL数据导入:")
url_sql = dml_manager.create_insert_from_url(
"analytics", "users",
"https://example.com/data/users.csv",
InsertFormat.CSV
)
print(f"\n URL导入:")
print(f" SQL: {url_sql}")
print("\n4. 批量插入配置:")
batch_config = BatchInsertConfig(
batch_size=5000,
async_insert=True,
wait_for_async_insert=True,
deduplicate=True
)
batch_statements = dml_manager.create_batch_insert(values_operation, batch_config, 20000)
print(f"\n 批量插入 (20000行, 批次大小: {batch_config.batch_size}):")
print(f" 批次数量: {len(batch_statements)}")
for i, stmt in enumerate(batch_statements[:3], 1):
print(f" 批次 {i}: {stmt[:100]}...")
print("\n5. 性能分析:")
performance_tests = [
(InsertFormat.VALUES, 500),
(InsertFormat.VALUES, 50000),
(InsertFormat.CSV, 100000),
(InsertFormat.PARQUET, 1000000)
]
for fmt, rows in performance_tests:
test_op = InsertOperation(
table_name="users",
database="analytics",
format_type=fmt,
data_source=DataSource.FILE
)
analysis = dml_manager.analyze_insert_performance(test_op, rows)
print(f"\n {fmt.value} ({rows:,} 行):")
print(f" 性能评分: {analysis['performance_score']}/10")
print(f" 预估时间: {analysis['estimated_time']}")
if analysis['recommendations']:
print(f" 建议: {analysis['recommendations'][0]}")
print("\n6. 示例数据生成:")
sample_sql = dml_manager.generate_sample_data_sql("analytics", "users", 10000)
print(f"\n 生成10000行示例数据:")
print(f" {sample_sql[:200]}...")
print("\n7. UPSERT操作:")
upsert_data = [
{'id': 1, 'username': "'alice_updated'", 'email': "'alice@newdomain.com'", 'age': 26, 'status': "'active'"},
{'id': 2, 'username': "'bob_updated'", 'email': "'bob@newdomain.com'", 'age': 31, 'status': "'inactive'"}
]
upsert_sql = dml_manager.create_upsert_statement("analytics", "users", upsert_data, ["id"])
print(f"\n UPSERT操作:")
print(f" {upsert_sql[:150]}...")
# 模拟插入历史
dml_manager.insert_history = [
{'format': 'VALUES', 'success': True, 'rows_affected': 1000},
{'format': 'CSV', 'success': True, 'rows_affected': 50000},
{'format': 'JSON', 'success': False, 'rows_affected': 0},
{'format': 'PARQUET', 'success': True, 'rows_affected': 100000}
]
print("\n8. 插入统计:")
stats = dml_manager.get_insert_statistics()
print(f"\n 总操作数: {stats['total_operations']}")
print(f" 成功率: {stats['success_rate']:.1f}%")
print(f" 平均每次插入行数: {stats['average_rows_per_operation']:,.0f}")
print(f" 最常用格式: {stats['most_used_format']}")
print(f" 总插入行数: {stats['total_rows_inserted']:,}")
print(f" 格式分布: {stats['format_distribution']}")
DQL(数据查询语言)操作
基础查询操作
from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Union
from datetime import datetime, timedelta
import json
class QueryType(Enum):
"""查询类型"""
SELECT = "SELECT"
AGGREGATE = "AGGREGATE"
WINDOW = "WINDOW"
SUBQUERY = "SUBQUERY"
CTE = "CTE"
UNION = "UNION"
JOIN = "JOIN"
class JoinType(Enum):
"""连接类型"""
INNER = "INNER JOIN"
LEFT = "LEFT JOIN"
RIGHT = "RIGHT JOIN"
FULL = "FULL OUTER JOIN"
CROSS = "CROSS JOIN"
ASOF = "ASOF JOIN"
ARRAY = "ARRAY JOIN"
class AggregateFunction(Enum):
"""聚合函数"""
COUNT = "count"
SUM = "sum"
AVG = "avg"
MIN = "min"
MAX = "max"
UNIQ = "uniq"
UNIQ_EXACT = "uniqExact"
GROUP_ARRAY = "groupArray"
GROUP_CONCAT = "groupConcat"
QUANTILE = "quantile"
MEDIAN = "median"
STDDEV = "stddevPop"
VAR = "varPop"
class WindowFunction(Enum):
"""窗口函数"""
ROW_NUMBER = "row_number"
RANK = "rank"
DENSE_RANK = "dense_rank"
LAG = "lag"
LEAD = "lead"
FIRST_VALUE = "first_value"
LAST_VALUE = "last_value"
NTH_VALUE = "nth_value"
PERCENT_RANK = "percent_rank"
CUME_DIST = "cume_dist"
@dataclass
class SelectClause:
"""SELECT子句定义"""
columns: List[str]
distinct: bool = False
limit: Optional[int] = None
offset: Optional[int] = None
@dataclass
class FromClause:
"""FROM子句定义"""
table_name: str
database: str = ""
alias: str = ""
sample_ratio: Optional[float] = None
@dataclass
class JoinClause:
"""JOIN子句定义"""
join_type: JoinType
table: FromClause
on_condition: str
using_columns: List[str] = None
def __post_init__(self):
if self.using_columns is None:
self.using_columns = []
@dataclass
class WhereClause:
"""WHERE子句定义"""
conditions: List[str]
@dataclass
class GroupByClause:
"""GROUP BY子句定义"""
columns: List[str]
with_rollup: bool = False
with_cube: bool = False
with_totals: bool = False
@dataclass
class OrderByClause:
"""ORDER BY子句定义"""
columns: List[str]
directions: List[str] = None # ASC, DESC
def __post_init__(self):
if self.directions is None:
self.directions = ["ASC"] * len(self.columns)
@dataclass
class QueryDefinition:
"""查询定义"""
select_clause: SelectClause
from_clause: FromClause
joins: List[JoinClause] = None
where_clause: Optional[WhereClause] = None
group_by_clause: Optional[GroupByClause] = None
having_clause: Optional[WhereClause] = None
order_by_clause: Optional[OrderByClause] = None
query_type: QueryType = QueryType.SELECT
settings: Dict[str, Any] = None
def __post_init__(self):
if self.joins is None:
self.joins = []
if self.settings is None:
self.settings = {}
class ClickHouseDQLManager:
"""ClickHouse DQL操作管理器"""
def __init__(self):
self.query_history: List[Dict[str, Any]] = []
self.common_functions = self._initialize_common_functions()
def _initialize_common_functions(self) -> Dict[str, List[str]]:
"""初始化常用函数"""
return {
"string_functions": [
"length", "lower", "upper", "substring", "concat", "replace",
"trim", "ltrim", "rtrim", "split", "extract", "match"
],
"date_functions": [
"now", "today", "yesterday", "toDate", "toDateTime", "formatDateTime",
"addDays", "addMonths", "addYears", "dateDiff", "toYYYYMM", "toStartOfMonth"
],
"math_functions": [
"abs", "round", "floor", "ceil", "sqrt", "pow", "log", "exp",
"sin", "cos", "tan", "greatest", "least", "rand"
],
"array_functions": [
"arrayJoin", "arrayElement", "arrayLength", "arrayConcat", "arraySlice",
"arrayUniq", "arraySort", "arrayReverse", "arrayFilter", "arrayMap"
],
"conditional_functions": [
"if", "multiIf", "case", "coalesce", "nullIf", "assumeNotNull"
]
}
def build_select_query(self, query_def: QueryDefinition) -> str:
"""构建SELECT查询"""
sql_parts = []
# SELECT子句
select_part = "SELECT"
if query_def.select_clause.distinct:
select_part += " DISTINCT"
columns_str = ", ".join(query_def.select_clause.columns)
sql_parts.append(f"{select_part} {columns_str}")
# FROM子句
from_part = "FROM"
if query_def.from_clause.database:
table_ref = f"`{query_def.from_clause.database}`.`{query_def.from_clause.table_name}`"
else:
table_ref = f"`{query_def.from_clause.table_name}`"
if query_def.from_clause.alias:
table_ref += f" AS {query_def.from_clause.alias}"
if query_def.from_clause.sample_ratio:
table_ref += f" SAMPLE {query_def.from_clause.sample_ratio}"
sql_parts.append(f"{from_part} {table_ref}")
# JOIN子句
for join in query_def.joins:
join_part = join.join_type.value
if join.table.database:
join_table = f"`{join.table.database}`.`{join.table.table_name}`"
else:
join_table = f"`{join.table.table_name}`"
if join.table.alias:
join_table += f" AS {join.table.alias}"
join_part += f" {join_table}"
if join.using_columns:
using_cols = ", ".join([f"`{col}`" for col in join.using_columns])
join_part += f" USING ({using_cols})"
elif join.on_condition:
join_part += f" ON {join.on_condition}"
sql_parts.append(join_part)
# WHERE子句
if query_def.where_clause:
where_conditions = " AND ".join(query_def.where_clause.conditions)
sql_parts.append(f"WHERE {where_conditions}")
# GROUP BY子句
if query_def.group_by_clause:
group_by_part = "GROUP BY " + ", ".join(query_def.group_by_clause.columns)
if query_def.group_by_clause.with_rollup:
group_by_part += " WITH ROLLUP"
elif query_def.group_by_clause.with_cube:
group_by_part += " WITH CUBE"
if query_def.group_by_clause.with_totals:
group_by_part += " WITH TOTALS"
sql_parts.append(group_by_part)
# HAVING子句
if query_def.having_clause:
having_conditions = " AND ".join(query_def.having_clause.conditions)
sql_parts.append(f"HAVING {having_conditions}")
# ORDER BY子句
if query_def.order_by_clause:
order_items = []
for i, col in enumerate(query_def.order_by_clause.columns):
direction = query_def.order_by_clause.directions[i] if i < len(query_def.order_by_clause.directions) else "ASC"
order_items.append(f"{col} {direction}")
sql_parts.append(f"ORDER BY {', '.join(order_items)}")
# LIMIT子句
if query_def.select_clause.limit:
limit_part = f"LIMIT {query_def.select_clause.limit}"
if query_def.select_clause.offset:
limit_part += f" OFFSET {query_def.select_clause.offset}"
sql_parts.append(limit_part)
# SETTINGS
if query_def.settings:
settings_str = ", ".join([f"{k} = {v}" for k, v in query_def.settings.items()])
sql_parts.append(f"SETTINGS {settings_str}")
return "\n".join(sql_parts) + ";"
def create_aggregate_query(self, table: str, database: str,
aggregate_columns: Dict[str, AggregateFunction],
group_by_columns: List[str] = None,
where_conditions: List[str] = None) -> str:
"""创建聚合查询"""
# 构建聚合列
select_columns = []
if group_by_columns:
select_columns.extend(group_by_columns)
for col_name, agg_func in aggregate_columns.items():
if agg_func == AggregateFunction.COUNT:
if col_name == "*":
select_columns.append("count(*) AS total_count")
else:
select_columns.append(f"count(`{col_name}`) AS {col_name}_count")
elif agg_func == AggregateFunction.UNIQ:
select_columns.append(f"uniq(`{col_name}`) AS {col_name}_uniq")
elif agg_func == AggregateFunction.GROUP_ARRAY:
select_columns.append(f"groupArray(`{col_name}`) AS {col_name}_array")
else:
select_columns.append(f"{agg_func.value}(`{col_name}`) AS {col_name}_{agg_func.value}")
# 构建查询定义
query_def = QueryDefinition(
select_clause=SelectClause(columns=select_columns),
from_clause=FromClause(table_name=table, database=database),
query_type=QueryType.AGGREGATE
)
if where_conditions:
query_def.where_clause = WhereClause(conditions=where_conditions)
if group_by_columns:
query_def.group_by_clause = GroupByClause(columns=group_by_columns)
return self.build_select_query(query_def)
def create_window_query(self, table: str, database: str,
window_functions: Dict[str, WindowFunction],
partition_by: List[str] = None,
order_by: List[str] = None,
where_conditions: List[str] = None) -> str:
"""创建窗口函数查询"""
select_columns = ["*"] # 包含原始列
# 构建窗口函数
for alias, win_func in window_functions.items():
window_part = f"{win_func.value}()"
over_clause_parts = []
if partition_by:
over_clause_parts.append(f"PARTITION BY {', '.join(partition_by)}")
if order_by:
over_clause_parts.append(f"ORDER BY {', '.join(order_by)}")
if over_clause_parts:
window_part += f" OVER ({' '.join(over_clause_parts)})"
else:
window_part += " OVER ()"
select_columns.append(f"{window_part} AS {alias}")
# 构建查询定义
query_def = QueryDefinition(
select_clause=SelectClause(columns=select_columns),
from_clause=FromClause(table_name=table, database=database),
query_type=QueryType.WINDOW
)
if where_conditions:
query_def.where_clause = WhereClause(conditions=where_conditions)
return self.build_select_query(query_def)
def create_join_query(self, main_table: FromClause, join_tables: List[JoinClause],
select_columns: List[str], where_conditions: List[str] = None) -> str:
"""创建连接查询"""
query_def = QueryDefinition(
select_clause=SelectClause(columns=select_columns),
from_clause=main_table,
joins=join_tables,
query_type=QueryType.JOIN
)
if where_conditions:
query_def.where_clause = WhereClause(conditions=where_conditions)
return self.build_select_query(query_def)
def create_cte_query(self, cte_definitions: Dict[str, str], main_query: str) -> str:
"""创建CTE(公共表表达式)查询"""
cte_parts = []
for cte_name, cte_sql in cte_definitions.items():
cte_parts.append(f"{cte_name} AS (\n {cte_sql}\n)")
cte_clause = "WITH " + ",\n".join(cte_parts)
return f"{cte_clause}\n\n{main_query}"
def create_union_query(self, queries: List[str], union_all: bool = False) -> str:
"""创建UNION查询"""
union_operator = "UNION ALL" if union_all else "UNION"
return f"\n{union_operator}\n".join(queries)
def generate_sample_queries(self, database: str, table: str) -> Dict[str, str]:
"""生成示例查询"""
queries = {}
# 基础查询
queries["basic_select"] = f"""-- 基础查询
SELECT *
FROM `{database}`.`{table}`
LIMIT 10;"""
# 条件查询
queries["conditional_select"] = f"""-- 条件查询
SELECT id, username, email, age
FROM `{database}`.`{table}`
WHERE age > 25
AND status = 'active'
AND created_at >= today() - INTERVAL 30 DAY
ORDER BY created_at DESC
LIMIT 100;"""
# 聚合查询
queries["aggregate_query"] = f"""-- 聚合查询
SELECT
status,
count(*) AS user_count,
avg(age) AS avg_age,
min(age) AS min_age,
max(age) AS max_age,
uniq(email) AS unique_emails
FROM `{database}`.`{table}`
GROUP BY status
ORDER BY user_count DESC;"""
# 时间序列查询
queries["time_series"] = f"""-- 时间序列查询
SELECT
toYYYYMM(created_at) AS month,
count(*) AS new_users,
uniq(email) AS unique_users,
avg(age) AS avg_age
FROM `{database}`.`{table}`
WHERE created_at >= today() - INTERVAL 12 MONTH
GROUP BY month
ORDER BY month;"""
# 窗口函数查询
queries["window_function"] = f"""-- 窗口函数查询
SELECT
id,
username,
age,
status,
row_number() OVER (PARTITION BY status ORDER BY age DESC) AS rank_in_status,
lag(age, 1) OVER (ORDER BY created_at) AS prev_user_age,
avg(age) OVER (PARTITION BY status) AS avg_age_by_status
FROM `{database}`.`{table}`
ORDER BY status, rank_in_status;"""
# 复杂查询
queries["complex_query"] = f"""-- 复杂查询(CTE + 窗口函数)
WITH user_stats AS (
SELECT
status,
count(*) AS user_count,
avg(age) AS avg_age
FROM `{database}`.`{table}`
GROUP BY status
),
user_rankings AS (
SELECT
id,
username,
age,
status,
dense_rank() OVER (PARTITION BY status ORDER BY age DESC) AS age_rank
FROM `{database}`.`{table}`
)
SELECT
ur.status,
ur.username,
ur.age,
ur.age_rank,
us.user_count,
us.avg_age,
(ur.age - us.avg_age) AS age_diff_from_avg
FROM user_rankings ur
JOIN user_stats us ON ur.status = us.status
WHERE ur.age_rank <= 5
ORDER BY ur.status, ur.age_rank;"""
return queries
def analyze_query_performance(self, query: str) -> Dict[str, Any]:
"""分析查询性能"""
analysis = {
"query_type": "unknown",
"complexity_score": 0,
"performance_score": 0,
"recommendations": [],
"potential_issues": []
}
query_lower = query.lower()
# 判断查询类型
if "group by" in query_lower:
analysis["query_type"] = "aggregate"
analysis["complexity_score"] += 3
if "join" in query_lower:
analysis["query_type"] = "join"
analysis["complexity_score"] += 4
join_count = query_lower.count("join")
if join_count > 3:
analysis["potential_issues"].append(f"多表连接({join_count}个)可能影响性能")
if "over (" in query_lower:
analysis["query_type"] = "window"
analysis["complexity_score"] += 5
if "with " in query_lower:
analysis["complexity_score"] += 2
analysis["recommendations"].append("CTE可以提高查询可读性")
# 性能分析
if "select *" in query_lower:
analysis["performance_score"] -= 2
analysis["potential_issues"].append("SELECT * 可能影响性能")
analysis["recommendations"].append("明确指定需要的列")
if "limit" not in query_lower and "count(" not in query_lower:
analysis["performance_score"] -= 1
analysis["recommendations"].append("考虑添加LIMIT限制结果集大小")
if "where" in query_lower:
analysis["performance_score"] += 2
else:
analysis["potential_issues"].append("缺少WHERE条件可能导致全表扫描")
if "order by" in query_lower and "limit" not in query_lower:
analysis["potential_issues"].append("ORDER BY without LIMIT可能消耗大量内存")
# 计算最终性能评分
base_score = 7
analysis["performance_score"] = max(1, min(10, base_score + analysis["performance_score"]))
return analysis
def get_function_examples(self, function_category: str) -> List[Dict[str, str]]:
"""获取函数使用示例"""
examples = []
if function_category == "string_functions":
examples = [
{"function": "length", "example": "SELECT length('ClickHouse') AS len", "result": "10"},
{"function": "concat", "example": "SELECT concat('Hello', ' ', 'World') AS greeting", "result": "Hello World"},
{"function": "substring", "example": "SELECT substring('ClickHouse', 1, 5) AS sub", "result": "Click"},
{"function": "replace", "example": "SELECT replace('Hello World', 'World', 'ClickHouse') AS replaced", "result": "Hello ClickHouse"}
]
elif function_category == "date_functions":
examples = [
{"function": "now", "example": "SELECT now() AS current_time", "result": "2024-01-15 10:30:00"},
{"function": "toYYYYMM", "example": "SELECT toYYYYMM(now()) AS year_month", "result": "202401"},
{"function": "addDays", "example": "SELECT addDays(today(), 7) AS next_week", "result": "2024-01-22"},
{"function": "dateDiff", "example": "SELECT dateDiff('day', '2024-01-01', '2024-01-15') AS days", "result": "14"}
]
elif function_category == "math_functions":
examples = [
{"function": "round", "example": "SELECT round(3.14159, 2) AS rounded", "result": "3.14"},
{"function": "greatest", "example": "SELECT greatest(10, 20, 5) AS max_val", "result": "20"},
{"function": "abs", "example": "SELECT abs(-42) AS absolute", "result": "42"},
{"function": "sqrt", "example": "SELECT sqrt(16) AS square_root", "result": "4"}
]
elif function_category == "array_functions":
examples = [
{"function": "arrayLength", "example": "SELECT arrayLength([1,2,3,4]) AS len", "result": "4"},
{"function": "arrayElement", "example": "SELECT arrayElement([1,2,3], 2) AS element", "result": "2"},
{"function": "arrayConcat", "example": "SELECT arrayConcat([1,2], [3,4]) AS combined", "result": "[1,2,3,4]"},
{"function": "arrayUniq", "example": "SELECT arrayUniq([1,2,2,3]) AS unique_count", "result": "3"}
]
return examples
# DQL操作示例
print("\n=== ClickHouse DQL操作管理 ===")
dql_manager = ClickHouseDQLManager()
print("\n1. 基础查询构建:")
# 简单查询
simple_query = QueryDefinition(
select_clause=SelectClause(columns=["id", "username", "email", "age"]),
from_clause=FromClause(table_name="users", database="analytics"),
where_clause=WhereClause(conditions=["age > 25", "status = 'active'"]),
order_by_clause=OrderByClause(columns=["created_at"], directions=["DESC"]),
query_type=QueryType.SELECT
)
simple_sql = dql_manager.build_select_query(simple_query)
print(f"\n 简单查询:")
print(f" {simple_sql[:150]}...")
print("\n2. 聚合查询:")
agg_sql = dql_manager.create_aggregate_query(
table="users",
database="analytics",
aggregate_columns={
"*": AggregateFunction.COUNT,
"age": AggregateFunction.AVG,
"email": AggregateFunction.UNIQ
},
group_by_columns=["status"],
where_conditions=["created_at >= today() - INTERVAL 30 DAY"]
)
print(f"\n 聚合查询:")
print(f" {agg_sql[:200]}...")
print("\n3. 窗口函数查询:")
window_sql = dql_manager.create_window_query(
table="users",
database="analytics",
window_functions={
"row_num": WindowFunction.ROW_NUMBER,
"age_rank": WindowFunction.RANK
},
partition_by=["status"],
order_by=["age DESC"]
)
print(f"\n 窗口函数查询:")
print(f" {window_sql[:200]}...")
print("\n4. 连接查询:")
# 定义连接表
orders_table = FromClause(table_name="orders", database="analytics", alias="o")
user_join = JoinClause(
join_type=JoinType.INNER,
table=FromClause(table_name="users", database="analytics", alias="u"),
on_condition="o.user_id = u.id"
)
join_sql = dql_manager.create_join_query(
main_table=orders_table,
join_tables=[user_join],
select_columns=["o.id", "o.total_amount", "u.username", "u.email"],
where_conditions=["o.created_at >= today() - INTERVAL 7 DAY"]
)
print(f"\n 连接查询:")
print(f" {join_sql[:200]}...")
print("\n5. CTE查询:")
cte_definitions = {
"active_users": "SELECT * FROM analytics.users WHERE status = 'active'",
"recent_orders": "SELECT * FROM analytics.orders WHERE created_at >= today() - INTERVAL 30 DAY"
}
main_query = "SELECT au.username, COUNT(ro.id) as order_count FROM active_users au LEFT JOIN recent_orders ro ON au.id = ro.user_id GROUP BY au.username"
cte_sql = dql_manager.create_cte_query(cte_definitions, main_query)
print(f"\n CTE查询:")
print(f" {cte_sql[:200]}...")
print("\n6. 示例查询生成:")
sample_queries = dql_manager.generate_sample_queries("analytics", "users")
for query_name, query_sql in list(sample_queries.items())[:3]:
print(f"\n {query_name}:")
print(f" {query_sql[:150]}...")
print("\n7. 查询性能分析:")
test_queries = [
"SELECT * FROM users",
"SELECT id, username FROM users WHERE age > 25 LIMIT 100",
"SELECT status, COUNT(*) FROM users GROUP BY status",
"SELECT u.*, o.total FROM users u JOIN orders o ON u.id = o.user_id"
]
for i, query in enumerate(test_queries, 1):
analysis = dql_manager.analyze_query_performance(query)
print(f"\n 查询 {i}: {query[:50]}...")
print(f" 类型: {analysis['query_type']}")
print(f" 复杂度: {analysis['complexity_score']}/10")
print(f" 性能评分: {analysis['performance_score']}/10")
if analysis['recommendations']:
print(f" 建议: {analysis['recommendations'][0]}")
if analysis['potential_issues']:
print(f" 潜在问题: {analysis['potential_issues'][0]}")
print("\n8. 函数使用示例:")
for category in ["string_functions", "date_functions"]:
examples = dql_manager.get_function_examples(category)
print(f"\n {category.replace('_', ' ').title()}:")
for example in examples[:2]:
print(f" {example['function']}: {example['example']} → {example['result']}")
print("\n9. UNION查询:")
union_queries = [
"SELECT 'active' as type, COUNT(*) as count FROM analytics.users WHERE status = 'active'",
"SELECT 'inactive' as type, COUNT(*) as count FROM analytics.users WHERE status = 'inactive'"
]
union_sql = dql_manager.create_union_query(union_queries, union_all=False)
print(f"\n UNION查询:")
print(f" {union_sql[:200]}...")
print("\n10. 采样查询:")
sample_query = QueryDefinition(
select_clause=SelectClause(columns=["status", "COUNT(*) as sample_count"]),
from_clause=FromClause(table_name="users", database="analytics", sample_ratio=0.1),
group_by_clause=GroupByClause(columns=["status"])
)
sample_sql = dql_manager.build_select_query(sample_query)
print(f"\n 采样查询 (10%):")
print(f" {sample_sql}")
总结
关键要点
DDL操作
- 数据库和表的创建、修改、删除操作
- 表结构的动态调整能力
- 支持多种表引擎和分区策略
DML操作
- 多种数据插入格式支持(VALUES、CSV、JSON、Parquet等)
- 批量插入和流式插入优化
- 从外部数据源导入数据的能力
DQL操作
- 强大的SQL查询能力
- 丰富的聚合函数和窗口函数
- 高性能的连接和子查询支持
- CTE和UNION等高级查询特性
最佳实践
表设计
- 选择合适的表引擎
- 合理设计分区键和排序键
- 考虑数据压缩和存储优化
数据插入
- 使用批量插入提高性能
- 选择合适的数据格式
- 避免频繁的小批量插入
查询优化
- 明确指定需要的列,避免SELECT *
- 合理使用WHERE条件过滤数据
- 适当使用LIMIT限制结果集大小
- 利用采样查询进行数据探索
性能监控
- 分析查询执行计划
- 监控查询性能指标
- 识别和优化慢查询
下一步学习
- 表引擎详解 - 深入了解各种表引擎的特性和使用场景
- 查询优化 - 学习查询性能优化技巧和最佳实践
- 数据类型进阶 - 掌握复杂数据类型的高级用法
- 实际应用 - 在真实项目中应用ClickHouse进行数据分析
通过本章的学习,您已经掌握了ClickHouse SQL的基础操作,为后续的高级特性学习打下了坚实的基础。 “`