概述

本章将详细介绍 ClickHouse 的 SQL 基础操作,包括数据定义语言(DDL)、数据操作语言(DML)和查询语言(DQL)。ClickHouse 支持标准 SQL 语法,同时提供了许多针对分析场景优化的扩展功能。

DDL(数据定义语言)操作

数据库操作

from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
from datetime import datetime
import json

class DatabaseEngine(Enum):
    """数据库引擎类型"""
    ORDINARY = "Ordinary"
    MEMORY = "Memory"
    LAZY = "Lazy"
    ATOMIC = "Atomic"
    MYSQL = "MySQL"
    POSTGRESQL = "PostgreSQL"
    REPLICATED = "Replicated"

class DDLOperationType(Enum):
    """DDL操作类型"""
    CREATE_DATABASE = "create_database"
    DROP_DATABASE = "drop_database"
    CREATE_TABLE = "create_table"
    DROP_TABLE = "drop_table"
    ALTER_TABLE = "alter_table"
    RENAME_TABLE = "rename_table"
    TRUNCATE_TABLE = "truncate_table"

@dataclass
class DatabaseDefinition:
    """数据库定义"""
    name: str
    engine: DatabaseEngine
    comment: str = ""
    settings: Dict[str, Any] = None
    
    def __post_init__(self):
        if self.settings is None:
            self.settings = {}

@dataclass
class ColumnDefinition:
    """列定义"""
    name: str
    data_type: str
    nullable: bool = False
    default_value: Any = None
    comment: str = ""
    codec: str = ""
    
@dataclass
class TableDefinition:
    """表定义"""
    name: str
    database: str
    columns: List[ColumnDefinition]
    engine: str
    order_by: List[str]
    partition_by: List[str] = None
    primary_key: List[str] = None
    sample_by: str = ""
    settings: Dict[str, Any] = None
    comment: str = ""
    
    def __post_init__(self):
        if self.partition_by is None:
            self.partition_by = []
        if self.primary_key is None:
            self.primary_key = []
        if self.settings is None:
            self.settings = {}

@dataclass
class DDLOperation:
    """DDL操作记录"""
    operation_type: DDLOperationType
    target_name: str
    sql_statement: str
    execution_time: datetime
    success: bool
    error_message: str = ""
    affected_objects: List[str] = None
    
    def __post_init__(self):
        if self.affected_objects is None:
            self.affected_objects = []

class ClickHouseDDLManager:
    """ClickHouse DDL操作管理器"""
    
    def __init__(self):
        self.databases: Dict[str, DatabaseDefinition] = {}
        self.tables: Dict[str, TableDefinition] = {}
        self.operation_history: List[DDLOperation] = []
        self._initialize_system_databases()
    
    def _initialize_system_databases(self):
        """初始化系统数据库"""
        system_databases = [
            DatabaseDefinition("system", DatabaseEngine.ORDINARY, "系统数据库"),
            DatabaseDefinition("information_schema", DatabaseEngine.MEMORY, "信息模式"),
            DatabaseDefinition("INFORMATION_SCHEMA", DatabaseEngine.MEMORY, "信息模式(大写)")
        ]
        
        for db in system_databases:
            self.databases[db.name] = db
    
    def create_database(self, db_def: DatabaseDefinition, if_not_exists: bool = True) -> Dict[str, Any]:
        """创建数据库"""
        if db_def.name in self.databases and not if_not_exists:
            return {
                "success": False,
                "error": f"Database {db_def.name} already exists",
                "sql": ""
            }
        
        # 构建SQL语句
        sql_parts = ["CREATE DATABASE"]
        if if_not_exists:
            sql_parts.append("IF NOT EXISTS")
        sql_parts.append(f"`{db_def.name}`")
        
        if db_def.engine != DatabaseEngine.ORDINARY:
            sql_parts.append(f"ENGINE = {db_def.engine.value}")
        
        if db_def.comment:
            sql_parts.append(f"COMMENT '{db_def.comment}'")
        
        sql = " ".join(sql_parts) + ";"
        
        # 记录操作
        operation = DDLOperation(
            operation_type=DDLOperationType.CREATE_DATABASE,
            target_name=db_def.name,
            sql_statement=sql,
            execution_time=datetime.now(),
            success=True
        )
        
        self.databases[db_def.name] = db_def
        self.operation_history.append(operation)
        
        return {
            "success": True,
            "sql": sql,
            "database": db_def.name
        }
    
    def drop_database(self, database_name: str, if_exists: bool = True) -> Dict[str, Any]:
        """删除数据库"""
        if database_name not in self.databases and not if_exists:
            return {
                "success": False,
                "error": f"Database {database_name} does not exist",
                "sql": ""
            }
        
        # 构建SQL语句
        sql_parts = ["DROP DATABASE"]
        if if_exists:
            sql_parts.append("IF EXISTS")
        sql_parts.append(f"`{database_name}`")
        
        sql = " ".join(sql_parts) + ";"
        
        # 记录操作
        affected_tables = [table_name for table_name, table_def in self.tables.items() 
                          if table_def.database == database_name]
        
        operation = DDLOperation(
            operation_type=DDLOperationType.DROP_DATABASE,
            target_name=database_name,
            sql_statement=sql,
            execution_time=datetime.now(),
            success=True,
            affected_objects=affected_tables
        )
        
        # 删除数据库及其表
        if database_name in self.databases:
            del self.databases[database_name]
        
        # 删除该数据库下的所有表
        tables_to_remove = [table_name for table_name, table_def in self.tables.items() 
                           if table_def.database == database_name]
        for table_name in tables_to_remove:
            del self.tables[table_name]
        
        self.operation_history.append(operation)
        
        return {
            "success": True,
            "sql": sql,
            "affected_tables": affected_tables
        }
    
    def create_table(self, table_def: TableDefinition, if_not_exists: bool = True) -> Dict[str, Any]:
        """创建表"""
        table_full_name = f"{table_def.database}.{table_def.name}"
        
        if table_full_name in self.tables and not if_not_exists:
            return {
                "success": False,
                "error": f"Table {table_full_name} already exists",
                "sql": ""
            }
        
        # 构建SQL语句
        sql_parts = ["CREATE TABLE"]
        if if_not_exists:
            sql_parts.append("IF NOT EXISTS")
        sql_parts.append(f"`{table_def.database}`.`{table_def.name}`")
        
        # 列定义
        column_definitions = []
        for col in table_def.columns:
            col_def = f"`{col.name}` {col.data_type}"
            if col.nullable:
                col_def += " NULL"
            if col.default_value is not None:
                if isinstance(col.default_value, str):
                    col_def += f" DEFAULT '{col.default_value}'"
                else:
                    col_def += f" DEFAULT {col.default_value}"
            if col.comment:
                col_def += f" COMMENT '{col.comment}'"
            if col.codec:
                col_def += f" CODEC({col.codec})"
            column_definitions.append(col_def)
        
        sql_parts.append(f"(\n    {',\n    '.join(column_definitions)}\n)")
        
        # 引擎
        sql_parts.append(f"ENGINE = {table_def.engine}")
        
        # ORDER BY
        if table_def.order_by:
            sql_parts.append(f"ORDER BY ({', '.join(table_def.order_by)})")
        
        # PARTITION BY
        if table_def.partition_by:
            sql_parts.append(f"PARTITION BY ({', '.join(table_def.partition_by)})")
        
        # PRIMARY KEY
        if table_def.primary_key:
            sql_parts.append(f"PRIMARY KEY ({', '.join(table_def.primary_key)})")
        
        # SAMPLE BY
        if table_def.sample_by:
            sql_parts.append(f"SAMPLE BY {table_def.sample_by}")
        
        # SETTINGS
        if table_def.settings:
            settings_str = ", ".join([f"{k} = {v}" for k, v in table_def.settings.items()])
            sql_parts.append(f"SETTINGS {settings_str}")
        
        # COMMENT
        if table_def.comment:
            sql_parts.append(f"COMMENT '{table_def.comment}'")
        
        sql = " ".join(sql_parts) + ";"
        
        # 记录操作
        operation = DDLOperation(
            operation_type=DDLOperationType.CREATE_TABLE,
            target_name=table_full_name,
            sql_statement=sql,
            execution_time=datetime.now(),
            success=True
        )
        
        self.tables[table_full_name] = table_def
        self.operation_history.append(operation)
        
        return {
            "success": True,
            "sql": sql,
            "table": table_full_name
        }
    
    def drop_table(self, database: str, table: str, if_exists: bool = True) -> Dict[str, Any]:
        """删除表"""
        table_full_name = f"{database}.{table}"
        
        if table_full_name not in self.tables and not if_exists:
            return {
                "success": False,
                "error": f"Table {table_full_name} does not exist",
                "sql": ""
            }
        
        # 构建SQL语句
        sql_parts = ["DROP TABLE"]
        if if_exists:
            sql_parts.append("IF EXISTS")
        sql_parts.append(f"`{database}`.`{table}`")
        
        sql = " ".join(sql_parts) + ";"
        
        # 记录操作
        operation = DDLOperation(
            operation_type=DDLOperationType.DROP_TABLE,
            target_name=table_full_name,
            sql_statement=sql,
            execution_time=datetime.now(),
            success=True
        )
        
        if table_full_name in self.tables:
            del self.tables[table_full_name]
        
        self.operation_history.append(operation)
        
        return {
            "success": True,
            "sql": sql,
            "table": table_full_name
        }
    
    def alter_table_add_column(self, database: str, table: str, column: ColumnDefinition, after_column: str = "") -> Dict[str, Any]:
        """添加列"""
        table_full_name = f"{database}.{table}"
        
        if table_full_name not in self.tables:
            return {
                "success": False,
                "error": f"Table {table_full_name} does not exist",
                "sql": ""
            }
        
        # 构建SQL语句
        sql_parts = [f"ALTER TABLE `{database}`.`{table}` ADD COLUMN"]
        
        col_def = f"`{column.name}` {column.data_type}"
        if column.nullable:
            col_def += " NULL"
        if column.default_value is not None:
            if isinstance(column.default_value, str):
                col_def += f" DEFAULT '{column.default_value}'"
            else:
                col_def += f" DEFAULT {column.default_value}"
        if column.comment:
            col_def += f" COMMENT '{column.comment}'"
        
        sql_parts.append(col_def)
        
        if after_column:
            sql_parts.append(f"AFTER `{after_column}`")
        
        sql = " ".join(sql_parts) + ";"
        
        # 更新表定义
        table_def = self.tables[table_full_name]
        if after_column:
            # 找到指定列的位置
            insert_index = len(table_def.columns)
            for i, col in enumerate(table_def.columns):
                if col.name == after_column:
                    insert_index = i + 1
                    break
            table_def.columns.insert(insert_index, column)
        else:
            table_def.columns.append(column)
        
        # 记录操作
        operation = DDLOperation(
            operation_type=DDLOperationType.ALTER_TABLE,
            target_name=table_full_name,
            sql_statement=sql,
            execution_time=datetime.now(),
            success=True
        )
        
        self.operation_history.append(operation)
        
        return {
            "success": True,
            "sql": sql,
            "table": table_full_name,
            "column_added": column.name
        }
    
    def alter_table_drop_column(self, database: str, table: str, column_name: str) -> Dict[str, Any]:
        """删除列"""
        table_full_name = f"{database}.{table}"
        
        if table_full_name not in self.tables:
            return {
                "success": False,
                "error": f"Table {table_full_name} does not exist",
                "sql": ""
            }
        
        # 构建SQL语句
        sql = f"ALTER TABLE `{database}`.`{table}` DROP COLUMN `{column_name}`;"
        
        # 更新表定义
        table_def = self.tables[table_full_name]
        table_def.columns = [col for col in table_def.columns if col.name != column_name]
        
        # 记录操作
        operation = DDLOperation(
            operation_type=DDLOperationType.ALTER_TABLE,
            target_name=table_full_name,
            sql_statement=sql,
            execution_time=datetime.now(),
            success=True
        )
        
        self.operation_history.append(operation)
        
        return {
            "success": True,
            "sql": sql,
            "table": table_full_name,
            "column_dropped": column_name
        }
    
    def get_database_list(self) -> List[Dict[str, Any]]:
        """获取数据库列表"""
        return [
            {
                "name": db.name,
                "engine": db.engine.value,
                "comment": db.comment,
                "table_count": len([t for t in self.tables.values() if t.database == db.name])
            }
            for db in self.databases.values()
        ]
    
    def get_table_list(self, database: str = "") -> List[Dict[str, Any]]:
        """获取表列表"""
        tables = self.tables.values()
        if database:
            tables = [t for t in tables if t.database == database]
        
        return [
            {
                "database": table.database,
                "name": table.name,
                "engine": table.engine,
                "column_count": len(table.columns),
                "comment": table.comment
            }
            for table in tables
        ]
    
    def get_table_schema(self, database: str, table: str) -> Dict[str, Any]:
        """获取表结构"""
        table_full_name = f"{database}.{table}"
        
        if table_full_name not in self.tables:
            return {"error": f"Table {table_full_name} does not exist"}
        
        table_def = self.tables[table_full_name]
        
        return {
            "database": table_def.database,
            "table": table_def.name,
            "engine": table_def.engine,
            "columns": [
                {
                    "name": col.name,
                    "type": col.data_type,
                    "nullable": col.nullable,
                    "default": col.default_value,
                    "comment": col.comment,
                    "codec": col.codec
                }
                for col in table_def.columns
            ],
            "order_by": table_def.order_by,
            "partition_by": table_def.partition_by,
            "primary_key": table_def.primary_key,
            "sample_by": table_def.sample_by,
            "settings": table_def.settings,
            "comment": table_def.comment
        }
    
    def get_operation_history(self, limit: int = 10) -> List[Dict[str, Any]]:
        """获取操作历史"""
        recent_operations = self.operation_history[-limit:] if limit > 0 else self.operation_history
        
        return [
            {
                "operation_type": op.operation_type.value,
                "target_name": op.target_name,
                "sql_statement": op.sql_statement,
                "execution_time": op.execution_time.isoformat(),
                "success": op.success,
                "error_message": op.error_message,
                "affected_objects": op.affected_objects
            }
            for op in reversed(recent_operations)
        ]
    
    def generate_create_table_template(self, table_name: str, database: str = "default") -> str:
        """生成创建表模板"""
        template = f"""CREATE TABLE IF NOT EXISTS `{database}`.`{table_name}` (
    `id` UInt64 COMMENT '主键ID',
    `name` String COMMENT '名称',
    `status` Enum8('active' = 1, 'inactive' = 0) DEFAULT 'active' COMMENT '状态',
    `created_at` DateTime DEFAULT now() COMMENT '创建时间',
    `updated_at` DateTime DEFAULT now() COMMENT '更新时间'
) ENGINE = MergeTree()
ORDER BY (id)
PARTITION BY toYYYYMM(created_at)
SETTINGS index_granularity = 8192
COMMENT '示例表';"""
        
        return template

# DDL操作示例
print("\n=== ClickHouse DDL操作管理 ===")

ddl_manager = ClickHouseDDLManager()

print("\n1. 数据库操作:")
# 创建数据库
test_db = DatabaseDefinition(
    name="analytics",
    engine=DatabaseEngine.ATOMIC,
    comment="分析数据库"
)

result = ddl_manager.create_database(test_db)
print(f"\n   创建数据库:")
print(f"     SQL: {result['sql']}")
print(f"     成功: {result['success']}")

print("\n   数据库列表:")
for db in ddl_manager.get_database_list():
    print(f"     - {db['name']} ({db['engine']}) - {db['table_count']} 个表")

print("\n2. 表操作:")
# 创建表
user_table = TableDefinition(
    name="users",
    database="analytics",
    columns=[
        ColumnDefinition("id", "UInt64", comment="用户ID"),
        ColumnDefinition("username", "String", comment="用户名"),
        ColumnDefinition("email", "String", comment="邮箱"),
        ColumnDefinition("age", "UInt8", nullable=True, comment="年龄"),
        ColumnDefinition("created_at", "DateTime", default_value="now()", comment="创建时间"),
        ColumnDefinition("status", "Enum8('active' = 1, 'inactive' = 0)", default_value="'active'", comment="状态")
    ],
    engine="MergeTree()",
    order_by=["id"],
    partition_by=["toYYYYMM(created_at)"],
    settings={"index_granularity": 8192},
    comment="用户表"
)

result = ddl_manager.create_table(user_table)
print(f"\n   创建表:")
print(f"     表名: {result['table']}")
print(f"     成功: {result['success']}")
print(f"     SQL: {result['sql'][:100]}...")

print("\n3. 表结构修改:")
# 添加列
new_column = ColumnDefinition(
    name="last_login",
    data_type="DateTime",
    nullable=True,
    comment="最后登录时间"
)

result = ddl_manager.alter_table_add_column("analytics", "users", new_column)
print(f"\n   添加列:")
print(f"     SQL: {result['sql']}")
print(f"     成功: {result['success']}")

print("\n4. 表结构查看:")
schema = ddl_manager.get_table_schema("analytics", "users")
print(f"\n   表: {schema['database']}.{schema['table']}")
print(f"   引擎: {schema['engine']}")
print(f"   列数: {len(schema['columns'])}")
print("   列信息:")
for col in schema['columns']:
    nullable_str = " (可空)" if col['nullable'] else ""
    default_str = f" 默认:{col['default']}" if col['default'] else ""
    print(f"     - {col['name']}: {col['type']}{nullable_str}{default_str} - {col['comment']}")

print("\n5. 操作历史:")
history = ddl_manager.get_operation_history(5)
for i, op in enumerate(history, 1):
    print(f"\n   {i}. {op['operation_type']} - {op['target_name']}")
    print(f"      时间: {op['execution_time']}")
    print(f"      成功: {op['success']}")
    print(f"      SQL: {op['sql_statement'][:80]}...")

print("\n6. 表模板生成:")
template = ddl_manager.generate_create_table_template("example_table", "analytics")
print(f"\n   模板SQL:")
print(f"     {template[:200]}...")

DML(数据操作语言)操作

数据插入操作

from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Union
from datetime import datetime, date
import json
import random

class InsertFormat(Enum):
    """插入数据格式"""
    VALUES = "VALUES"
    SELECT = "SELECT"
    CSV = "CSV"
    JSON = "JSON"
    TSV = "TSV"
    PARQUET = "Parquet"

class DataSource(Enum):
    """数据源类型"""
    MANUAL = "manual"
    FILE = "file"
    URL = "url"
    TABLE = "table"
    FUNCTION = "function"

@dataclass
class InsertOperation:
    """插入操作定义"""
    table_name: str
    database: str
    format_type: InsertFormat
    data_source: DataSource
    columns: List[str] = None
    data: Any = None
    source_path: str = ""
    settings: Dict[str, Any] = None
    
    def __post_init__(self):
        if self.columns is None:
            self.columns = []
        if self.settings is None:
            self.settings = {}

@dataclass
class BatchInsertConfig:
    """批量插入配置"""
    batch_size: int = 1000
    max_memory_usage: int = 10000000000  # 10GB
    max_execution_time: int = 3600  # 1小时
    async_insert: bool = False
    wait_for_async_insert: bool = True
    deduplicate: bool = True

class ClickHouseDMLManager:
    """ClickHouse DML操作管理器"""
    
    def __init__(self):
        self.insert_history: List[Dict[str, Any]] = []
        self.data_generators = self._initialize_data_generators()
    
    def _initialize_data_generators(self) -> Dict[str, Any]:
        """初始化数据生成器"""
        return {
            "user_names": ["Alice", "Bob", "Charlie", "Diana", "Eve", "Frank", "Grace", "Henry"],
            "domains": ["gmail.com", "yahoo.com", "hotmail.com", "outlook.com", "company.com"],
            "statuses": ["active", "inactive"],
            "countries": ["CN", "US", "JP", "DE", "FR", "GB", "CA", "AU"]
        }
    
    def generate_insert_values(self, table_schema: Dict[str, Any], row_count: int = 10) -> List[Dict[str, Any]]:
        """生成插入数据"""
        rows = []
        
        for i in range(row_count):
            row = {}
            for col in table_schema['columns']:
                col_name = col['name']
                col_type = col['type'].lower()
                
                if 'id' in col_name.lower():
                    row[col_name] = i + 1
                elif 'name' in col_name.lower():
                    row[col_name] = f"'{random.choice(self.data_generators['user_names'])}{i}"
                elif 'email' in col_name.lower():
                    username = f"user{i}"
                    domain = random.choice(self.data_generators['domains'])
                    row[col_name] = f"'{username}@{domain}'"
                elif 'age' in col_name.lower():
                    row[col_name] = random.randint(18, 80) if not col['nullable'] or random.random() > 0.1 else 'NULL'
                elif 'status' in col_name.lower():
                    row[col_name] = f"'{random.choice(self.data_generators['statuses'])}'"
                elif 'country' in col_name.lower():
                    row[col_name] = f"'{random.choice(self.data_generators['countries'])}'"
                elif 'created_at' in col_name.lower() or 'datetime' in col_type:
                    if col.get('default') == 'now()':
                        row[col_name] = 'now()'
                    else:
                        row[col_name] = f"'{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}'"
                elif 'date' in col_type:
                    row[col_name] = f"'{date.today().strftime('%Y-%m-%d')}'"
                elif col_type.startswith('uint') or col_type.startswith('int'):
                    row[col_name] = random.randint(1, 1000)
                elif col_type.startswith('float') or col_type.startswith('decimal'):
                    row[col_name] = round(random.uniform(1.0, 1000.0), 2)
                elif col_type == 'string':
                    row[col_name] = f"'sample_text_{i}'"
                else:
                    row[col_name] = f"'default_value_{i}'"
            
            rows.append(row)
        
        return rows
    
    def create_insert_statement(self, operation: InsertOperation, data: List[Dict[str, Any]] = None) -> str:
        """创建插入语句"""
        table_ref = f"`{operation.database}`.`{operation.table_name}`"
        
        if operation.format_type == InsertFormat.VALUES:
            sql_parts = [f"INSERT INTO {table_ref}"]
            
            if operation.columns:
                columns_str = ", ".join([f"`{col}`" for col in operation.columns])
                sql_parts.append(f"({columns_str})")
            
            sql_parts.append("VALUES")
            
            if data:
                value_rows = []
                for row in data:
                    if operation.columns:
                        values = [str(row.get(col, 'NULL')) for col in operation.columns]
                    else:
                        values = [str(v) for v in row.values()]
                    value_rows.append(f"({', '.join(values)})")
                
                sql_parts.append(",\n    ".join(value_rows))
            else:
                sql_parts.append("(...)")
        
        elif operation.format_type == InsertFormat.SELECT:
            sql_parts = [f"INSERT INTO {table_ref}"]
            
            if operation.columns:
                columns_str = ", ".join([f"`{col}`" for col in operation.columns])
                sql_parts.append(f"({columns_str})")
            
            if operation.source_path:
                sql_parts.append(f"SELECT * FROM {operation.source_path}")
            else:
                sql_parts.append("SELECT ...")
        
        elif operation.format_type in [InsertFormat.CSV, InsertFormat.JSON, InsertFormat.TSV]:
            sql_parts = [f"INSERT INTO {table_ref}"]
            sql_parts.append(f"FROM INFILE '{operation.source_path}'")
            sql_parts.append(f"FORMAT {operation.format_type.value}")
        
        # 添加设置
        if operation.settings:
            settings_str = ", ".join([f"{k} = {v}" for k, v in operation.settings.items()])
            sql_parts.append(f"SETTINGS {settings_str}")
        
        return " ".join(sql_parts) + ";"
    
    def create_batch_insert(self, operation: InsertOperation, batch_config: BatchInsertConfig, 
                           total_rows: int = 10000) -> List[str]:
        """创建批量插入语句"""
        statements = []
        
        # 计算批次数量
        batch_count = (total_rows + batch_config.batch_size - 1) // batch_config.batch_size
        
        for batch_num in range(batch_count):
            start_row = batch_num * batch_config.batch_size
            end_row = min(start_row + batch_config.batch_size, total_rows)
            batch_size = end_row - start_row
            
            # 生成批次数据(模拟)
            batch_operation = InsertOperation(
                table_name=operation.table_name,
                database=operation.database,
                format_type=operation.format_type,
                data_source=operation.data_source,
                columns=operation.columns,
                settings={
                    **operation.settings,
                    "max_memory_usage": batch_config.max_memory_usage,
                    "max_execution_time": batch_config.max_execution_time
                }
            )
            
            if batch_config.async_insert:
                batch_operation.settings["async_insert"] = 1
                batch_operation.settings["wait_for_async_insert"] = 1 if batch_config.wait_for_async_insert else 0
            
            if batch_config.deduplicate:
                batch_operation.settings["insert_deduplicate"] = 1
            
            # 创建语句(简化版)
            table_ref = f"`{operation.database}`.`{operation.table_name}`"
            sql = f"INSERT INTO {table_ref} VALUES /* Batch {batch_num + 1}: {batch_size} rows */"
            
            if batch_operation.settings:
                settings_str = ", ".join([f"{k} = {v}" for k, v in batch_operation.settings.items()])
                sql += f" SETTINGS {settings_str}"
            
            sql += ";"
            statements.append(sql)
        
        return statements
    
    def create_insert_from_file(self, database: str, table: str, file_path: str, 
                               file_format: InsertFormat, columns: List[str] = None) -> str:
        """从文件插入数据"""
        table_ref = f"`{database}`.`{table}`"
        
        sql_parts = [f"INSERT INTO {table_ref}"]
        
        if columns:
            columns_str = ", ".join([f"`{col}`" for col in columns])
            sql_parts.append(f"({columns_str})")
        
        if file_format == InsertFormat.CSV:
            sql_parts.extend([
                f"FROM INFILE '{file_path}'",
                "FORMAT CSV",
                "SETTINGS input_format_csv_skip_first_lines = 1"
            ])
        elif file_format == InsertFormat.JSON:
            sql_parts.extend([
                f"FROM INFILE '{file_path}'",
                "FORMAT JSONEachRow"
            ])
        elif file_format == InsertFormat.TSV:
            sql_parts.extend([
                f"FROM INFILE '{file_path}'",
                "FORMAT TSV",
                "SETTINGS input_format_tsv_skip_first_lines = 1"
            ])
        elif file_format == InsertFormat.PARQUET:
            sql_parts.extend([
                f"FROM INFILE '{file_path}'",
                "FORMAT Parquet"
            ])
        
        return " ".join(sql_parts) + ";"
    
    def create_insert_from_url(self, database: str, table: str, url: str, 
                              file_format: InsertFormat, columns: List[str] = None) -> str:
        """从URL插入数据"""
        table_ref = f"`{database}`.`{table}`"
        
        sql_parts = [f"INSERT INTO {table_ref}"]
        
        if columns:
            columns_str = ", ".join([f"`{col}`" for col in columns])
            sql_parts.append(f"({columns_str})")
        
        sql_parts.append(f"SELECT * FROM url('{url}', '{file_format.value}')")
        
        return " ".join(sql_parts) + ";"
    
    def create_upsert_statement(self, database: str, table: str, data: List[Dict[str, Any]], 
                               key_columns: List[str]) -> str:
        """创建UPSERT语句(使用ReplacingMergeTree)"""
        table_ref = f"`{database}`.`{table}`"
        
        # ClickHouse没有直接的UPSERT,使用INSERT + OPTIMIZE模拟
        insert_sql = f"INSERT INTO {table_ref} VALUES"
        
        value_rows = []
        for row in data:
            values = [str(v) for v in row.values()]
            value_rows.append(f"({', '.join(values)})")
        
        insert_sql += " " + ",\n    ".join(value_rows) + ";"
        
        # 添加优化建议
        optimize_sql = f"OPTIMIZE TABLE {table_ref} FINAL;"
        
        return f"{insert_sql}\n\n-- 优化表以应用去重\n{optimize_sql}"
    
    def analyze_insert_performance(self, operation: InsertOperation, estimated_rows: int) -> Dict[str, Any]:
        """分析插入性能"""
        analysis = {
            "operation": f"{operation.database}.{operation.table_name}",
            "format": operation.format_type.value,
            "estimated_rows": estimated_rows,
            "performance_score": 0,
            "recommendations": [],
            "estimated_time": "unknown",
            "memory_usage": "unknown"
        }
        
        # 根据格式评估性能
        if operation.format_type == InsertFormat.VALUES:
            if estimated_rows < 1000:
                analysis["performance_score"] = 8
                analysis["estimated_time"] = "< 1秒"
            elif estimated_rows < 100000:
                analysis["performance_score"] = 6
                analysis["estimated_time"] = "1-10秒"
                analysis["recommendations"].append("考虑使用批量插入")
            else:
                analysis["performance_score"] = 3
                analysis["estimated_time"] = "> 30秒"
                analysis["recommendations"].append("建议使用文件导入或批量插入")
        
        elif operation.format_type in [InsertFormat.CSV, InsertFormat.TSV, InsertFormat.PARQUET]:
            analysis["performance_score"] = 9
            analysis["estimated_time"] = "很快"
            analysis["memory_usage"] = "低"
            if operation.format_type == InsertFormat.PARQUET:
                analysis["recommendations"].append("Parquet格式提供最佳性能")
        
        elif operation.format_type == InsertFormat.JSON:
            analysis["performance_score"] = 7
            analysis["estimated_time"] = "中等"
            analysis["recommendations"].append("JSON解析可能较慢,考虑使用JSONEachRow")
        
        # 数据量评估
        if estimated_rows > 1000000:
            analysis["recommendations"].append("大数据量建议使用异步插入")
            analysis["recommendations"].append("考虑分区策略优化")
        
        return analysis
    
    def generate_sample_data_sql(self, database: str, table: str, row_count: int = 1000) -> str:
        """生成示例数据插入SQL"""
        return f"""-- 生成 {row_count} 行示例数据
INSERT INTO `{database}`.`{table}`
SELECT 
    number AS id,
    concat('user_', toString(number)) AS username,
    concat('user_', toString(number), '@example.com') AS email,
    18 + (number % 50) AS age,
    now() - INTERVAL (number % 365) DAY AS created_at,
    if(number % 10 = 0, 'inactive', 'active') AS status
FROM numbers({row_count});"""
    
    def get_insert_statistics(self) -> Dict[str, Any]:
        """获取插入统计信息"""
        total_operations = len(self.insert_history)
        
        if total_operations == 0:
            return {
                "total_operations": 0,
                "success_rate": 0,
                "average_rows_per_operation": 0,
                "most_used_format": "none",
                "total_rows_inserted": 0
            }
        
        successful_ops = sum(1 for op in self.insert_history if op.get('success', False))
        total_rows = sum(op.get('rows_affected', 0) for op in self.insert_history)
        
        format_counts = {}
        for op in self.insert_history:
            fmt = op.get('format', 'unknown')
            format_counts[fmt] = format_counts.get(fmt, 0) + 1
        
        most_used_format = max(format_counts.items(), key=lambda x: x[1])[0] if format_counts else "none"
        
        return {
            "total_operations": total_operations,
            "success_rate": (successful_ops / total_operations) * 100,
            "average_rows_per_operation": total_rows / total_operations if total_operations > 0 else 0,
            "most_used_format": most_used_format,
            "total_rows_inserted": total_rows,
            "format_distribution": format_counts
        }

# DML操作示例
print("\n=== ClickHouse DML操作管理 ===")

dml_manager = ClickHouseDMLManager()

# 模拟表结构
user_table_schema = {
    'database': 'analytics',
    'table': 'users',
    'columns': [
        {'name': 'id', 'type': 'UInt64', 'nullable': False},
        {'name': 'username', 'type': 'String', 'nullable': False},
        {'name': 'email', 'type': 'String', 'nullable': False},
        {'name': 'age', 'type': 'UInt8', 'nullable': True},
        {'name': 'created_at', 'type': 'DateTime', 'nullable': False, 'default': 'now()'},
        {'name': 'status', 'type': 'Enum8', 'nullable': False}
    ]
}

print("\n1. 基本插入操作:")
# VALUES插入
values_operation = InsertOperation(
    table_name="users",
    database="analytics",
    format_type=InsertFormat.VALUES,
    data_source=DataSource.MANUAL,
    columns=["id", "username", "email", "age", "status"]
)

# 生成示例数据
sample_data = dml_manager.generate_insert_values(user_table_schema, 5)
insert_sql = dml_manager.create_insert_statement(values_operation, sample_data)
print(f"\n   VALUES插入:")
print(f"     SQL: {insert_sql[:150]}...")

print("\n2. 文件导入操作:")
# CSV文件导入
csv_sql = dml_manager.create_insert_from_file(
    "analytics", "users", "/data/users.csv", InsertFormat.CSV,
    ["username", "email", "age", "status"]
)
print(f"\n   CSV导入:")
print(f"     SQL: {csv_sql}")

# JSON文件导入
json_sql = dml_manager.create_insert_from_file(
    "analytics", "users", "/data/users.json", InsertFormat.JSON
)
print(f"\n   JSON导入:")
print(f"     SQL: {json_sql}")

print("\n3. URL数据导入:")
url_sql = dml_manager.create_insert_from_url(
    "analytics", "users", 
    "https://example.com/data/users.csv", 
    InsertFormat.CSV
)
print(f"\n   URL导入:")
print(f"     SQL: {url_sql}")

print("\n4. 批量插入配置:")
batch_config = BatchInsertConfig(
    batch_size=5000,
    async_insert=True,
    wait_for_async_insert=True,
    deduplicate=True
)

batch_statements = dml_manager.create_batch_insert(values_operation, batch_config, 20000)
print(f"\n   批量插入 (20000行, 批次大小: {batch_config.batch_size}):")
print(f"     批次数量: {len(batch_statements)}")
for i, stmt in enumerate(batch_statements[:3], 1):
    print(f"     批次 {i}: {stmt[:100]}...")

print("\n5. 性能分析:")
performance_tests = [
    (InsertFormat.VALUES, 500),
    (InsertFormat.VALUES, 50000),
    (InsertFormat.CSV, 100000),
    (InsertFormat.PARQUET, 1000000)
]

for fmt, rows in performance_tests:
    test_op = InsertOperation(
        table_name="users",
        database="analytics",
        format_type=fmt,
        data_source=DataSource.FILE
    )
    
    analysis = dml_manager.analyze_insert_performance(test_op, rows)
    print(f"\n   {fmt.value} ({rows:,} 行):")
    print(f"     性能评分: {analysis['performance_score']}/10")
    print(f"     预估时间: {analysis['estimated_time']}")
    if analysis['recommendations']:
        print(f"     建议: {analysis['recommendations'][0]}")

print("\n6. 示例数据生成:")
sample_sql = dml_manager.generate_sample_data_sql("analytics", "users", 10000)
print(f"\n   生成10000行示例数据:")
print(f"     {sample_sql[:200]}...")

print("\n7. UPSERT操作:")
upsert_data = [
    {'id': 1, 'username': "'alice_updated'", 'email': "'alice@newdomain.com'", 'age': 26, 'status': "'active'"},
    {'id': 2, 'username': "'bob_updated'", 'email': "'bob@newdomain.com'", 'age': 31, 'status': "'inactive'"}
]

upsert_sql = dml_manager.create_upsert_statement("analytics", "users", upsert_data, ["id"])
print(f"\n   UPSERT操作:")
print(f"     {upsert_sql[:150]}...")

# 模拟插入历史
dml_manager.insert_history = [
    {'format': 'VALUES', 'success': True, 'rows_affected': 1000},
    {'format': 'CSV', 'success': True, 'rows_affected': 50000},
    {'format': 'JSON', 'success': False, 'rows_affected': 0},
    {'format': 'PARQUET', 'success': True, 'rows_affected': 100000}
]

print("\n8. 插入统计:")
stats = dml_manager.get_insert_statistics()
print(f"\n   总操作数: {stats['total_operations']}")
print(f"   成功率: {stats['success_rate']:.1f}%")
print(f"   平均每次插入行数: {stats['average_rows_per_operation']:,.0f}")
print(f"   最常用格式: {stats['most_used_format']}")
print(f"   总插入行数: {stats['total_rows_inserted']:,}")
print(f"   格式分布: {stats['format_distribution']}")

DQL(数据查询语言)操作

基础查询操作

from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Union
from datetime import datetime, timedelta
import json

class QueryType(Enum):
    """查询类型"""
    SELECT = "SELECT"
    AGGREGATE = "AGGREGATE"
    WINDOW = "WINDOW"
    SUBQUERY = "SUBQUERY"
    CTE = "CTE"
    UNION = "UNION"
    JOIN = "JOIN"

class JoinType(Enum):
    """连接类型"""
    INNER = "INNER JOIN"
    LEFT = "LEFT JOIN"
    RIGHT = "RIGHT JOIN"
    FULL = "FULL OUTER JOIN"
    CROSS = "CROSS JOIN"
    ASOF = "ASOF JOIN"
    ARRAY = "ARRAY JOIN"

class AggregateFunction(Enum):
    """聚合函数"""
    COUNT = "count"
    SUM = "sum"
    AVG = "avg"
    MIN = "min"
    MAX = "max"
    UNIQ = "uniq"
    UNIQ_EXACT = "uniqExact"
    GROUP_ARRAY = "groupArray"
    GROUP_CONCAT = "groupConcat"
    QUANTILE = "quantile"
    MEDIAN = "median"
    STDDEV = "stddevPop"
    VAR = "varPop"

class WindowFunction(Enum):
    """窗口函数"""
    ROW_NUMBER = "row_number"
    RANK = "rank"
    DENSE_RANK = "dense_rank"
    LAG = "lag"
    LEAD = "lead"
    FIRST_VALUE = "first_value"
    LAST_VALUE = "last_value"
    NTH_VALUE = "nth_value"
    PERCENT_RANK = "percent_rank"
    CUME_DIST = "cume_dist"

@dataclass
class SelectClause:
    """SELECT子句定义"""
    columns: List[str]
    distinct: bool = False
    limit: Optional[int] = None
    offset: Optional[int] = None

@dataclass
class FromClause:
    """FROM子句定义"""
    table_name: str
    database: str = ""
    alias: str = ""
    sample_ratio: Optional[float] = None

@dataclass
class JoinClause:
    """JOIN子句定义"""
    join_type: JoinType
    table: FromClause
    on_condition: str
    using_columns: List[str] = None
    
    def __post_init__(self):
        if self.using_columns is None:
            self.using_columns = []

@dataclass
class WhereClause:
    """WHERE子句定义"""
    conditions: List[str]
    
@dataclass
class GroupByClause:
    """GROUP BY子句定义"""
    columns: List[str]
    with_rollup: bool = False
    with_cube: bool = False
    with_totals: bool = False

@dataclass
class OrderByClause:
    """ORDER BY子句定义"""
    columns: List[str]
    directions: List[str] = None  # ASC, DESC
    
    def __post_init__(self):
        if self.directions is None:
            self.directions = ["ASC"] * len(self.columns)

@dataclass
class QueryDefinition:
    """查询定义"""
    select_clause: SelectClause
    from_clause: FromClause
    joins: List[JoinClause] = None
    where_clause: Optional[WhereClause] = None
    group_by_clause: Optional[GroupByClause] = None
    having_clause: Optional[WhereClause] = None
    order_by_clause: Optional[OrderByClause] = None
    query_type: QueryType = QueryType.SELECT
    settings: Dict[str, Any] = None
    
    def __post_init__(self):
        if self.joins is None:
            self.joins = []
        if self.settings is None:
            self.settings = {}

class ClickHouseDQLManager:
    """ClickHouse DQL操作管理器"""
    
    def __init__(self):
        self.query_history: List[Dict[str, Any]] = []
        self.common_functions = self._initialize_common_functions()
    
    def _initialize_common_functions(self) -> Dict[str, List[str]]:
        """初始化常用函数"""
        return {
            "string_functions": [
                "length", "lower", "upper", "substring", "concat", "replace", 
                "trim", "ltrim", "rtrim", "split", "extract", "match"
            ],
            "date_functions": [
                "now", "today", "yesterday", "toDate", "toDateTime", "formatDateTime",
                "addDays", "addMonths", "addYears", "dateDiff", "toYYYYMM", "toStartOfMonth"
            ],
            "math_functions": [
                "abs", "round", "floor", "ceil", "sqrt", "pow", "log", "exp",
                "sin", "cos", "tan", "greatest", "least", "rand"
            ],
            "array_functions": [
                "arrayJoin", "arrayElement", "arrayLength", "arrayConcat", "arraySlice",
                "arrayUniq", "arraySort", "arrayReverse", "arrayFilter", "arrayMap"
            ],
            "conditional_functions": [
                "if", "multiIf", "case", "coalesce", "nullIf", "assumeNotNull"
            ]
        }
    
    def build_select_query(self, query_def: QueryDefinition) -> str:
        """构建SELECT查询"""
        sql_parts = []
        
        # SELECT子句
        select_part = "SELECT"
        if query_def.select_clause.distinct:
            select_part += " DISTINCT"
        
        columns_str = ", ".join(query_def.select_clause.columns)
        sql_parts.append(f"{select_part} {columns_str}")
        
        # FROM子句
        from_part = "FROM"
        if query_def.from_clause.database:
            table_ref = f"`{query_def.from_clause.database}`.`{query_def.from_clause.table_name}`"
        else:
            table_ref = f"`{query_def.from_clause.table_name}`"
        
        if query_def.from_clause.alias:
            table_ref += f" AS {query_def.from_clause.alias}"
        
        if query_def.from_clause.sample_ratio:
            table_ref += f" SAMPLE {query_def.from_clause.sample_ratio}"
        
        sql_parts.append(f"{from_part} {table_ref}")
        
        # JOIN子句
        for join in query_def.joins:
            join_part = join.join_type.value
            
            if join.table.database:
                join_table = f"`{join.table.database}`.`{join.table.table_name}`"
            else:
                join_table = f"`{join.table.table_name}`"
            
            if join.table.alias:
                join_table += f" AS {join.table.alias}"
            
            join_part += f" {join_table}"
            
            if join.using_columns:
                using_cols = ", ".join([f"`{col}`" for col in join.using_columns])
                join_part += f" USING ({using_cols})"
            elif join.on_condition:
                join_part += f" ON {join.on_condition}"
            
            sql_parts.append(join_part)
        
        # WHERE子句
        if query_def.where_clause:
            where_conditions = " AND ".join(query_def.where_clause.conditions)
            sql_parts.append(f"WHERE {where_conditions}")
        
        # GROUP BY子句
        if query_def.group_by_clause:
            group_by_part = "GROUP BY " + ", ".join(query_def.group_by_clause.columns)
            
            if query_def.group_by_clause.with_rollup:
                group_by_part += " WITH ROLLUP"
            elif query_def.group_by_clause.with_cube:
                group_by_part += " WITH CUBE"
            
            if query_def.group_by_clause.with_totals:
                group_by_part += " WITH TOTALS"
            
            sql_parts.append(group_by_part)
        
        # HAVING子句
        if query_def.having_clause:
            having_conditions = " AND ".join(query_def.having_clause.conditions)
            sql_parts.append(f"HAVING {having_conditions}")
        
        # ORDER BY子句
        if query_def.order_by_clause:
            order_items = []
            for i, col in enumerate(query_def.order_by_clause.columns):
                direction = query_def.order_by_clause.directions[i] if i < len(query_def.order_by_clause.directions) else "ASC"
                order_items.append(f"{col} {direction}")
            
            sql_parts.append(f"ORDER BY {', '.join(order_items)}")
        
        # LIMIT子句
        if query_def.select_clause.limit:
            limit_part = f"LIMIT {query_def.select_clause.limit}"
            if query_def.select_clause.offset:
                limit_part += f" OFFSET {query_def.select_clause.offset}"
            sql_parts.append(limit_part)
        
        # SETTINGS
        if query_def.settings:
            settings_str = ", ".join([f"{k} = {v}" for k, v in query_def.settings.items()])
            sql_parts.append(f"SETTINGS {settings_str}")
        
        return "\n".join(sql_parts) + ";"
    
    def create_aggregate_query(self, table: str, database: str, 
                              aggregate_columns: Dict[str, AggregateFunction],
                              group_by_columns: List[str] = None,
                              where_conditions: List[str] = None) -> str:
        """创建聚合查询"""
        # 构建聚合列
        select_columns = []
        
        if group_by_columns:
            select_columns.extend(group_by_columns)
        
        for col_name, agg_func in aggregate_columns.items():
            if agg_func == AggregateFunction.COUNT:
                if col_name == "*":
                    select_columns.append("count(*) AS total_count")
                else:
                    select_columns.append(f"count(`{col_name}`) AS {col_name}_count")
            elif agg_func == AggregateFunction.UNIQ:
                select_columns.append(f"uniq(`{col_name}`) AS {col_name}_uniq")
            elif agg_func == AggregateFunction.GROUP_ARRAY:
                select_columns.append(f"groupArray(`{col_name}`) AS {col_name}_array")
            else:
                select_columns.append(f"{agg_func.value}(`{col_name}`) AS {col_name}_{agg_func.value}")
        
        # 构建查询定义
        query_def = QueryDefinition(
            select_clause=SelectClause(columns=select_columns),
            from_clause=FromClause(table_name=table, database=database),
            query_type=QueryType.AGGREGATE
        )
        
        if where_conditions:
            query_def.where_clause = WhereClause(conditions=where_conditions)
        
        if group_by_columns:
            query_def.group_by_clause = GroupByClause(columns=group_by_columns)
        
        return self.build_select_query(query_def)
    
    def create_window_query(self, table: str, database: str,
                           window_functions: Dict[str, WindowFunction],
                           partition_by: List[str] = None,
                           order_by: List[str] = None,
                           where_conditions: List[str] = None) -> str:
        """创建窗口函数查询"""
        select_columns = ["*"]  # 包含原始列
        
        # 构建窗口函数
        for alias, win_func in window_functions.items():
            window_part = f"{win_func.value}()"
            
            over_clause_parts = []
            if partition_by:
                over_clause_parts.append(f"PARTITION BY {', '.join(partition_by)}")
            if order_by:
                over_clause_parts.append(f"ORDER BY {', '.join(order_by)}")
            
            if over_clause_parts:
                window_part += f" OVER ({' '.join(over_clause_parts)})"
            else:
                window_part += " OVER ()"
            
            select_columns.append(f"{window_part} AS {alias}")
        
        # 构建查询定义
        query_def = QueryDefinition(
            select_clause=SelectClause(columns=select_columns),
            from_clause=FromClause(table_name=table, database=database),
            query_type=QueryType.WINDOW
        )
        
        if where_conditions:
            query_def.where_clause = WhereClause(conditions=where_conditions)
        
        return self.build_select_query(query_def)
    
    def create_join_query(self, main_table: FromClause, join_tables: List[JoinClause],
                         select_columns: List[str], where_conditions: List[str] = None) -> str:
        """创建连接查询"""
        query_def = QueryDefinition(
            select_clause=SelectClause(columns=select_columns),
            from_clause=main_table,
            joins=join_tables,
            query_type=QueryType.JOIN
        )
        
        if where_conditions:
            query_def.where_clause = WhereClause(conditions=where_conditions)
        
        return self.build_select_query(query_def)
    
    def create_cte_query(self, cte_definitions: Dict[str, str], main_query: str) -> str:
        """创建CTE(公共表表达式)查询"""
        cte_parts = []
        
        for cte_name, cte_sql in cte_definitions.items():
            cte_parts.append(f"{cte_name} AS (\n    {cte_sql}\n)")
        
        cte_clause = "WITH " + ",\n".join(cte_parts)
        
        return f"{cte_clause}\n\n{main_query}"
    
    def create_union_query(self, queries: List[str], union_all: bool = False) -> str:
        """创建UNION查询"""
        union_operator = "UNION ALL" if union_all else "UNION"
        return f"\n{union_operator}\n".join(queries)
    
    def generate_sample_queries(self, database: str, table: str) -> Dict[str, str]:
        """生成示例查询"""
        queries = {}
        
        # 基础查询
        queries["basic_select"] = f"""-- 基础查询
SELECT *
FROM `{database}`.`{table}`
LIMIT 10;"""
        
        # 条件查询
        queries["conditional_select"] = f"""-- 条件查询
SELECT id, username, email, age
FROM `{database}`.`{table}`
WHERE age > 25 
  AND status = 'active'
  AND created_at >= today() - INTERVAL 30 DAY
ORDER BY created_at DESC
LIMIT 100;"""
        
        # 聚合查询
        queries["aggregate_query"] = f"""-- 聚合查询
SELECT 
    status,
    count(*) AS user_count,
    avg(age) AS avg_age,
    min(age) AS min_age,
    max(age) AS max_age,
    uniq(email) AS unique_emails
FROM `{database}`.`{table}`
GROUP BY status
ORDER BY user_count DESC;"""
        
        # 时间序列查询
        queries["time_series"] = f"""-- 时间序列查询
SELECT 
    toYYYYMM(created_at) AS month,
    count(*) AS new_users,
    uniq(email) AS unique_users,
    avg(age) AS avg_age
FROM `{database}`.`{table}`
WHERE created_at >= today() - INTERVAL 12 MONTH
GROUP BY month
ORDER BY month;"""
        
        # 窗口函数查询
        queries["window_function"] = f"""-- 窗口函数查询
SELECT 
    id,
    username,
    age,
    status,
    row_number() OVER (PARTITION BY status ORDER BY age DESC) AS rank_in_status,
    lag(age, 1) OVER (ORDER BY created_at) AS prev_user_age,
    avg(age) OVER (PARTITION BY status) AS avg_age_by_status
FROM `{database}`.`{table}`
ORDER BY status, rank_in_status;"""
        
        # 复杂查询
        queries["complex_query"] = f"""-- 复杂查询(CTE + 窗口函数)
WITH user_stats AS (
    SELECT 
        status,
        count(*) AS user_count,
        avg(age) AS avg_age
    FROM `{database}`.`{table}`
    GROUP BY status
),
user_rankings AS (
    SELECT 
        id,
        username,
        age,
        status,
        dense_rank() OVER (PARTITION BY status ORDER BY age DESC) AS age_rank
    FROM `{database}`.`{table}`
)
SELECT 
    ur.status,
    ur.username,
    ur.age,
    ur.age_rank,
    us.user_count,
    us.avg_age,
    (ur.age - us.avg_age) AS age_diff_from_avg
FROM user_rankings ur
JOIN user_stats us ON ur.status = us.status
WHERE ur.age_rank <= 5
ORDER BY ur.status, ur.age_rank;"""
        
        return queries
    
    def analyze_query_performance(self, query: str) -> Dict[str, Any]:
        """分析查询性能"""
        analysis = {
            "query_type": "unknown",
            "complexity_score": 0,
            "performance_score": 0,
            "recommendations": [],
            "potential_issues": []
        }
        
        query_lower = query.lower()
        
        # 判断查询类型
        if "group by" in query_lower:
            analysis["query_type"] = "aggregate"
            analysis["complexity_score"] += 3
        
        if "join" in query_lower:
            analysis["query_type"] = "join"
            analysis["complexity_score"] += 4
            join_count = query_lower.count("join")
            if join_count > 3:
                analysis["potential_issues"].append(f"多表连接({join_count}个)可能影响性能")
        
        if "over (" in query_lower:
            analysis["query_type"] = "window"
            analysis["complexity_score"] += 5
        
        if "with " in query_lower:
            analysis["complexity_score"] += 2
            analysis["recommendations"].append("CTE可以提高查询可读性")
        
        # 性能分析
        if "select *" in query_lower:
            analysis["performance_score"] -= 2
            analysis["potential_issues"].append("SELECT * 可能影响性能")
            analysis["recommendations"].append("明确指定需要的列")
        
        if "limit" not in query_lower and "count(" not in query_lower:
            analysis["performance_score"] -= 1
            analysis["recommendations"].append("考虑添加LIMIT限制结果集大小")
        
        if "where" in query_lower:
            analysis["performance_score"] += 2
        else:
            analysis["potential_issues"].append("缺少WHERE条件可能导致全表扫描")
        
        if "order by" in query_lower and "limit" not in query_lower:
            analysis["potential_issues"].append("ORDER BY without LIMIT可能消耗大量内存")
        
        # 计算最终性能评分
        base_score = 7
        analysis["performance_score"] = max(1, min(10, base_score + analysis["performance_score"]))
        
        return analysis
    
    def get_function_examples(self, function_category: str) -> List[Dict[str, str]]:
        """获取函数使用示例"""
        examples = []
        
        if function_category == "string_functions":
            examples = [
                {"function": "length", "example": "SELECT length('ClickHouse') AS len", "result": "10"},
                {"function": "concat", "example": "SELECT concat('Hello', ' ', 'World') AS greeting", "result": "Hello World"},
                {"function": "substring", "example": "SELECT substring('ClickHouse', 1, 5) AS sub", "result": "Click"},
                {"function": "replace", "example": "SELECT replace('Hello World', 'World', 'ClickHouse') AS replaced", "result": "Hello ClickHouse"}
            ]
        
        elif function_category == "date_functions":
            examples = [
                {"function": "now", "example": "SELECT now() AS current_time", "result": "2024-01-15 10:30:00"},
                {"function": "toYYYYMM", "example": "SELECT toYYYYMM(now()) AS year_month", "result": "202401"},
                {"function": "addDays", "example": "SELECT addDays(today(), 7) AS next_week", "result": "2024-01-22"},
                {"function": "dateDiff", "example": "SELECT dateDiff('day', '2024-01-01', '2024-01-15') AS days", "result": "14"}
            ]
        
        elif function_category == "math_functions":
            examples = [
                {"function": "round", "example": "SELECT round(3.14159, 2) AS rounded", "result": "3.14"},
                {"function": "greatest", "example": "SELECT greatest(10, 20, 5) AS max_val", "result": "20"},
                {"function": "abs", "example": "SELECT abs(-42) AS absolute", "result": "42"},
                {"function": "sqrt", "example": "SELECT sqrt(16) AS square_root", "result": "4"}
            ]
        
        elif function_category == "array_functions":
            examples = [
                {"function": "arrayLength", "example": "SELECT arrayLength([1,2,3,4]) AS len", "result": "4"},
                {"function": "arrayElement", "example": "SELECT arrayElement([1,2,3], 2) AS element", "result": "2"},
                {"function": "arrayConcat", "example": "SELECT arrayConcat([1,2], [3,4]) AS combined", "result": "[1,2,3,4]"},
                {"function": "arrayUniq", "example": "SELECT arrayUniq([1,2,2,3]) AS unique_count", "result": "3"}
            ]
        
        return examples

# DQL操作示例
print("\n=== ClickHouse DQL操作管理 ===")

dql_manager = ClickHouseDQLManager()

print("\n1. 基础查询构建:")
# 简单查询
simple_query = QueryDefinition(
    select_clause=SelectClause(columns=["id", "username", "email", "age"]),
    from_clause=FromClause(table_name="users", database="analytics"),
    where_clause=WhereClause(conditions=["age > 25", "status = 'active'"]),
    order_by_clause=OrderByClause(columns=["created_at"], directions=["DESC"]),
    query_type=QueryType.SELECT
)

simple_sql = dql_manager.build_select_query(simple_query)
print(f"\n   简单查询:")
print(f"     {simple_sql[:150]}...")

print("\n2. 聚合查询:")
agg_sql = dql_manager.create_aggregate_query(
    table="users",
    database="analytics",
    aggregate_columns={
        "*": AggregateFunction.COUNT,
        "age": AggregateFunction.AVG,
        "email": AggregateFunction.UNIQ
    },
    group_by_columns=["status"],
    where_conditions=["created_at >= today() - INTERVAL 30 DAY"]
)
print(f"\n   聚合查询:")
print(f"     {agg_sql[:200]}...")

print("\n3. 窗口函数查询:")
window_sql = dql_manager.create_window_query(
    table="users",
    database="analytics",
    window_functions={
        "row_num": WindowFunction.ROW_NUMBER,
        "age_rank": WindowFunction.RANK
    },
    partition_by=["status"],
    order_by=["age DESC"]
)
print(f"\n   窗口函数查询:")
print(f"     {window_sql[:200]}...")

print("\n4. 连接查询:")
# 定义连接表
orders_table = FromClause(table_name="orders", database="analytics", alias="o")
user_join = JoinClause(
    join_type=JoinType.INNER,
    table=FromClause(table_name="users", database="analytics", alias="u"),
    on_condition="o.user_id = u.id"
)

join_sql = dql_manager.create_join_query(
    main_table=orders_table,
    join_tables=[user_join],
    select_columns=["o.id", "o.total_amount", "u.username", "u.email"],
    where_conditions=["o.created_at >= today() - INTERVAL 7 DAY"]
)
print(f"\n   连接查询:")
print(f"     {join_sql[:200]}...")

print("\n5. CTE查询:")
cte_definitions = {
    "active_users": "SELECT * FROM analytics.users WHERE status = 'active'",
    "recent_orders": "SELECT * FROM analytics.orders WHERE created_at >= today() - INTERVAL 30 DAY"
}
main_query = "SELECT au.username, COUNT(ro.id) as order_count FROM active_users au LEFT JOIN recent_orders ro ON au.id = ro.user_id GROUP BY au.username"

cte_sql = dql_manager.create_cte_query(cte_definitions, main_query)
print(f"\n   CTE查询:")
print(f"     {cte_sql[:200]}...")

print("\n6. 示例查询生成:")
sample_queries = dql_manager.generate_sample_queries("analytics", "users")
for query_name, query_sql in list(sample_queries.items())[:3]:
    print(f"\n   {query_name}:")
    print(f"     {query_sql[:150]}...")

print("\n7. 查询性能分析:")
test_queries = [
    "SELECT * FROM users",
    "SELECT id, username FROM users WHERE age > 25 LIMIT 100",
    "SELECT status, COUNT(*) FROM users GROUP BY status",
    "SELECT u.*, o.total FROM users u JOIN orders o ON u.id = o.user_id"
]

for i, query in enumerate(test_queries, 1):
    analysis = dql_manager.analyze_query_performance(query)
    print(f"\n   查询 {i}: {query[:50]}...")
    print(f"     类型: {analysis['query_type']}")
    print(f"     复杂度: {analysis['complexity_score']}/10")
    print(f"     性能评分: {analysis['performance_score']}/10")
    if analysis['recommendations']:
        print(f"     建议: {analysis['recommendations'][0]}")
    if analysis['potential_issues']:
        print(f"     潜在问题: {analysis['potential_issues'][0]}")

print("\n8. 函数使用示例:")
for category in ["string_functions", "date_functions"]:
    examples = dql_manager.get_function_examples(category)
    print(f"\n   {category.replace('_', ' ').title()}:")
    for example in examples[:2]:
        print(f"     {example['function']}: {example['example']} → {example['result']}")

print("\n9. UNION查询:")
union_queries = [
    "SELECT 'active' as type, COUNT(*) as count FROM analytics.users WHERE status = 'active'",
    "SELECT 'inactive' as type, COUNT(*) as count FROM analytics.users WHERE status = 'inactive'"
]

union_sql = dql_manager.create_union_query(union_queries, union_all=False)
print(f"\n   UNION查询:")
print(f"     {union_sql[:200]}...")

print("\n10. 采样查询:")
sample_query = QueryDefinition(
    select_clause=SelectClause(columns=["status", "COUNT(*) as sample_count"]),
    from_clause=FromClause(table_name="users", database="analytics", sample_ratio=0.1),
    group_by_clause=GroupByClause(columns=["status"])
)

sample_sql = dql_manager.build_select_query(sample_query)
print(f"\n   采样查询 (10%):")
print(f"     {sample_sql}")

总结

关键要点

  1. DDL操作

    • 数据库和表的创建、修改、删除操作
    • 表结构的动态调整能力
    • 支持多种表引擎和分区策略
  2. DML操作

    • 多种数据插入格式支持(VALUES、CSV、JSON、Parquet等)
    • 批量插入和流式插入优化
    • 从外部数据源导入数据的能力
  3. DQL操作

    • 强大的SQL查询能力
    • 丰富的聚合函数和窗口函数
    • 高性能的连接和子查询支持
    • CTE和UNION等高级查询特性

最佳实践

  1. 表设计

    • 选择合适的表引擎
    • 合理设计分区键和排序键
    • 考虑数据压缩和存储优化
  2. 数据插入

    • 使用批量插入提高性能
    • 选择合适的数据格式
    • 避免频繁的小批量插入
  3. 查询优化

    • 明确指定需要的列,避免SELECT *
    • 合理使用WHERE条件过滤数据
    • 适当使用LIMIT限制结果集大小
    • 利用采样查询进行数据探索
  4. 性能监控

    • 分析查询执行计划
    • 监控查询性能指标
    • 识别和优化慢查询

下一步学习

  1. 表引擎详解 - 深入了解各种表引擎的特性和使用场景
  2. 查询优化 - 学习查询性能优化技巧和最佳实践
  3. 数据类型进阶 - 掌握复杂数据类型的高级用法
  4. 实际应用 - 在真实项目中应用ClickHouse进行数据分析

通过本章的学习,您已经掌握了ClickHouse SQL的基础操作,为后续的高级特性学习打下了坚实的基础。 “`