数据源管理与数据建模是低代码平台的核心基础设施,负责统一管理各种数据源的连接、配置和访问,以及提供灵活的数据建模能力。本章将详细介绍如何设计和实现一个功能完整的数据源管理与数据建模系统。
11.1 数据源管理架构概览
11.1.1 数据源管理器接口
// 数据源管理器接口
interface DataSourceManager {
// 数据源管理
createDataSource(config: DataSourceConfig): Promise<DataSource>;
updateDataSource(id: string, config: Partial<DataSourceConfig>): Promise<DataSource>;
deleteDataSource(id: string): Promise<void>;
getDataSource(id: string): Promise<DataSource | null>;
queryDataSources(query: DataSourceQuery): Promise<DataSource[]>;
// 连接管理
testConnection(config: DataSourceConfig): Promise<ConnectionTestResult>;
getConnection(id: string): Promise<Connection | null>;
closeConnection(id: string): Promise<void>;
// 数据操作
executeQuery(dataSourceId: string, query: QueryRequest): Promise<QueryResult>;
executeCommand(dataSourceId: string, command: CommandRequest): Promise<CommandResult>;
// 模式发现
discoverSchema(dataSourceId: string): Promise<SchemaInfo>;
refreshSchema(dataSourceId: string): Promise<SchemaInfo>;
// 事件处理
on(event: string, listener: Function): void;
off(event: string, listener: Function): void;
emit(event: string, data: any): void;
}
// 低代码数据源管理器实现
class LowCodeDataSourceManager implements DataSourceManager {
private dataSources: Map<string, DataSource> = new Map();
private connections: Map<string, Connection> = new Map();
private connectionPools: Map<string, ConnectionPool> = new Map();
private schemaCache: Map<string, SchemaInfo> = new Map();
private eventBus: EventBus;
private encryptionService: EncryptionService;
private auditLogger: AuditLogger;
private options: DataSourceManagerOptions;
constructor(options: DataSourceManagerOptions) {
this.options = options;
this.eventBus = options.eventBus || new EventBus();
this.encryptionService = options.encryptionService;
this.auditLogger = options.auditLogger;
this.initializeBuiltinDataSources();
}
private initializeBuiltinDataSources(): void {
// 初始化内置数据源类型
this.registerDataSourceType('mysql', new MySQLDataSourceAdapter());
this.registerDataSourceType('postgresql', new PostgreSQLDataSourceAdapter());
this.registerDataSourceType('mongodb', new MongoDBDataSourceAdapter());
this.registerDataSourceType('redis', new RedisDataSourceAdapter());
this.registerDataSourceType('elasticsearch', new ElasticsearchDataSourceAdapter());
this.registerDataSourceType('rest', new RESTDataSourceAdapter());
this.registerDataSourceType('graphql', new GraphQLDataSourceAdapter());
}
async createDataSource(config: DataSourceConfig): Promise<DataSource> {
// 验证配置
this.validateDataSourceConfig(config);
// 加密敏感信息
const encryptedConfig = await this.encryptSensitiveData(config);
// 创建数据源
const dataSource: DataSource = {
id: generateId(),
name: config.name,
type: config.type,
config: encryptedConfig,
status: DataSourceStatus.INACTIVE,
createdAt: new Date(),
updatedAt: new Date(),
metadata: config.metadata || {}
};
// 测试连接
const testResult = await this.testConnection(config);
if (!testResult.success) {
throw new Error(`数据源连接测试失败: ${testResult.error}`);
}
dataSource.status = DataSourceStatus.ACTIVE;
this.dataSources.set(dataSource.id, dataSource);
// 记录审计日志
await this.auditLogger.log({
action: 'datasource:create',
details: { dataSourceId: dataSource.id, name: dataSource.name, type: dataSource.type },
timestamp: new Date()
});
// 触发事件
this.emit('datasource:created', { dataSource });
return dataSource;
}
async updateDataSource(id: string, config: Partial<DataSourceConfig>): Promise<DataSource> {
const dataSource = this.dataSources.get(id);
if (!dataSource) {
throw new Error(`数据源不存在: ${id}`);
}
// 更新配置
const updatedConfig = { ...dataSource.config, ...config };
// 如果配置发生变化,重新测试连接
if (config.host || config.port || config.database || config.username || config.password) {
const testResult = await this.testConnection(updatedConfig as DataSourceConfig);
if (!testResult.success) {
throw new Error(`数据源连接测试失败: ${testResult.error}`);
}
// 关闭旧连接
await this.closeConnection(id);
}
// 加密敏感信息
const encryptedConfig = await this.encryptSensitiveData(updatedConfig as DataSourceConfig);
dataSource.config = encryptedConfig;
dataSource.updatedAt = new Date();
if (config.name) dataSource.name = config.name;
if (config.metadata) dataSource.metadata = { ...dataSource.metadata, ...config.metadata };
// 记录审计日志
await this.auditLogger.log({
action: 'datasource:update',
details: { dataSourceId: id, changes: Object.keys(config) },
timestamp: new Date()
});
// 触发事件
this.emit('datasource:updated', { dataSource, changes: config });
return dataSource;
}
async deleteDataSource(id: string): Promise<void> {
const dataSource = this.dataSources.get(id);
if (!dataSource) {
throw new Error(`数据源不存在: ${id}`);
}
// 关闭连接
await this.closeConnection(id);
// 清理缓存
this.schemaCache.delete(id);
// 删除数据源
this.dataSources.delete(id);
// 记录审计日志
await this.auditLogger.log({
action: 'datasource:delete',
details: { dataSourceId: id, name: dataSource.name },
timestamp: new Date()
});
// 触发事件
this.emit('datasource:deleted', { dataSource });
}
async getDataSource(id: string): Promise<DataSource | null> {
const dataSource = this.dataSources.get(id);
if (!dataSource) {
return null;
}
// 解密敏感信息(如果需要)
const decryptedConfig = await this.decryptSensitiveData(dataSource.config);
return {
...dataSource,
config: decryptedConfig
};
}
async queryDataSources(query: DataSourceQuery): Promise<DataSource[]> {
let results = Array.from(this.dataSources.values());
// 应用过滤条件
if (query.type) {
results = results.filter(ds => ds.type === query.type);
}
if (query.status) {
results = results.filter(ds => ds.status === query.status);
}
if (query.search) {
const searchLower = query.search.toLowerCase();
results = results.filter(ds =>
ds.name.toLowerCase().includes(searchLower) ||
ds.type.toLowerCase().includes(searchLower)
);
}
// 排序
if (query.sortBy) {
results.sort((a, b) => {
const aValue = (a as any)[query.sortBy!];
const bValue = (b as any)[query.sortBy!];
if (query.sortOrder === 'desc') {
return bValue > aValue ? 1 : -1;
}
return aValue > bValue ? 1 : -1;
});
}
// 分页
const offset = query.offset || 0;
const limit = query.limit || 50;
return results.slice(offset, offset + limit);
}
async testConnection(config: DataSourceConfig): Promise<ConnectionTestResult> {
try {
const adapter = this.getDataSourceAdapter(config.type);
const result = await adapter.testConnection(config);
return {
success: true,
latency: result.latency,
version: result.version,
features: result.features
};
} catch (error) {
return {
success: false,
error: error.message
};
}
}
async getConnection(id: string): Promise<Connection | null> {
return this.connections.get(id) || null;
}
async closeConnection(id: string): Promise<void> {
const connection = this.connections.get(id);
if (connection) {
await connection.close();
this.connections.delete(id);
}
const pool = this.connectionPools.get(id);
if (pool) {
await pool.close();
this.connectionPools.delete(id);
}
}
async executeQuery(dataSourceId: string, query: QueryRequest): Promise<QueryResult> {
const dataSource = this.dataSources.get(dataSourceId);
if (!dataSource) {
throw new Error(`数据源不存在: ${dataSourceId}`);
}
const adapter = this.getDataSourceAdapter(dataSource.type);
const connection = await this.getOrCreateConnection(dataSource);
try {
const result = await adapter.executeQuery(connection, query);
// 记录审计日志
await this.auditLogger.log({
action: 'datasource:query',
details: {
dataSourceId,
queryType: query.type,
rowCount: result.rows?.length || 0
},
timestamp: new Date()
});
return result;
} catch (error) {
// 记录错误日志
await this.auditLogger.log({
action: 'datasource:query:error',
details: { dataSourceId, error: error.message },
timestamp: new Date()
});
throw error;
}
}
async executeCommand(dataSourceId: string, command: CommandRequest): Promise<CommandResult> {
const dataSource = this.dataSources.get(dataSourceId);
if (!dataSource) {
throw new Error(`数据源不存在: ${dataSourceId}`);
}
const adapter = this.getDataSourceAdapter(dataSource.type);
const connection = await this.getOrCreateConnection(dataSource);
try {
const result = await adapter.executeCommand(connection, command);
// 记录审计日志
await this.auditLogger.log({
action: 'datasource:command',
details: {
dataSourceId,
commandType: command.type,
affectedRows: result.affectedRows
},
timestamp: new Date()
});
return result;
} catch (error) {
// 记录错误日志
await this.auditLogger.log({
action: 'datasource:command:error',
details: { dataSourceId, error: error.message },
timestamp: new Date()
});
throw error;
}
}
async discoverSchema(dataSourceId: string): Promise<SchemaInfo> {
const dataSource = this.dataSources.get(dataSourceId);
if (!dataSource) {
throw new Error(`数据源不存在: ${dataSourceId}`);
}
// 检查缓存
const cached = this.schemaCache.get(dataSourceId);
if (cached && Date.now() - cached.lastUpdated.getTime() < this.options.schemaCacheTTL) {
return cached;
}
const adapter = this.getDataSourceAdapter(dataSource.type);
const connection = await this.getOrCreateConnection(dataSource);
try {
const schema = await adapter.discoverSchema(connection);
schema.lastUpdated = new Date();
// 缓存模式信息
this.schemaCache.set(dataSourceId, schema);
// 触发事件
this.emit('schema:discovered', { dataSourceId, schema });
return schema;
} catch (error) {
throw new Error(`模式发现失败: ${error.message}`);
}
}
async refreshSchema(dataSourceId: string): Promise<SchemaInfo> {
// 清除缓存
this.schemaCache.delete(dataSourceId);
// 重新发现模式
return this.discoverSchema(dataSourceId);
}
// 事件处理
on(event: string, listener: Function): void {
this.eventBus.on(event, listener);
}
off(event: string, listener: Function): void {
this.eventBus.off(event, listener);
}
emit(event: string, data: any): void {
this.eventBus.emit(event, data);
}
// 私有方法
private validateDataSourceConfig(config: DataSourceConfig): void {
if (!config.name || !config.type) {
throw new Error('数据源名称和类型不能为空');
}
if (!this.isValidDataSourceType(config.type)) {
throw new Error(`不支持的数据源类型: ${config.type}`);
}
}
private async encryptSensitiveData(config: DataSourceConfig): Promise<DataSourceConfig> {
const encrypted = { ...config };
if (config.password) {
encrypted.password = await this.encryptionService.encrypt(config.password);
}
if (config.apiKey) {
encrypted.apiKey = await this.encryptionService.encrypt(config.apiKey);
}
return encrypted;
}
private async decryptSensitiveData(config: DataSourceConfig): Promise<DataSourceConfig> {
const decrypted = { ...config };
if (config.password) {
decrypted.password = await this.encryptionService.decrypt(config.password);
}
if (config.apiKey) {
decrypted.apiKey = await this.encryptionService.decrypt(config.apiKey);
}
return decrypted;
}
private async getOrCreateConnection(dataSource: DataSource): Promise<Connection> {
let connection = this.connections.get(dataSource.id);
if (!connection || !connection.isConnected()) {
const adapter = this.getDataSourceAdapter(dataSource.type);
const decryptedConfig = await this.decryptSensitiveData(dataSource.config);
connection = await adapter.createConnection(decryptedConfig);
this.connections.set(dataSource.id, connection);
}
return connection;
}
private getDataSourceAdapter(type: string): DataSourceAdapter {
const adapter = this.dataSourceAdapters.get(type);
if (!adapter) {
throw new Error(`不支持的数据源类型: ${type}`);
}
return adapter;
}
private isValidDataSourceType(type: string): boolean {
return this.dataSourceAdapters.has(type);
}
private registerDataSourceType(type: string, adapter: DataSourceAdapter): void {
this.dataSourceAdapters.set(type, adapter);
}
private dataSourceAdapters: Map<string, DataSourceAdapter> = new Map();
}
11.2 核心数据结构
11.2.1 数据源相关接口
// 数据源配置
interface DataSourceConfig {
name: string;
type: string;
host?: string;
port?: number;
database?: string;
username?: string;
password?: string;
apiKey?: string;
url?: string;
options?: Record<string, any>;
metadata?: Record<string, any>;
}
// 数据源
interface DataSource {
id: string;
name: string;
type: string;
config: DataSourceConfig;
status: DataSourceStatus;
createdAt: Date;
updatedAt: Date;
metadata: Record<string, any>;
}
// 数据源状态
enum DataSourceStatus {
ACTIVE = 'active',
INACTIVE = 'inactive',
ERROR = 'error',
CONNECTING = 'connecting'
}
// 数据源查询
interface DataSourceQuery {
type?: string;
status?: DataSourceStatus;
search?: string;
sortBy?: string;
sortOrder?: 'asc' | 'desc';
limit?: number;
offset?: number;
}
// 数据源管理器选项
interface DataSourceManagerOptions {
eventBus?: EventBus;
encryptionService: EncryptionService;
auditLogger: AuditLogger;
schemaCacheTTL: number;
connectionTimeout: number;
maxConnections: number;
}
// 连接测试结果
interface ConnectionTestResult {
success: boolean;
error?: string;
latency?: number;
version?: string;
features?: string[];
}
// 连接接口
interface Connection {
id: string;
dataSourceId: string;
isConnected(): boolean;
close(): Promise<void>;
execute(query: string, params?: any[]): Promise<any>;
}
// 连接池接口
interface ConnectionPool {
getConnection(): Promise<Connection>;
releaseConnection(connection: Connection): Promise<void>;
close(): Promise<void>;
getStats(): PoolStats;
}
// 连接池统计
interface PoolStats {
totalConnections: number;
activeConnections: number;
idleConnections: number;
waitingRequests: number;
}
11.2.2 查询和命令相关接口
// 查询请求
interface QueryRequest {
type: QueryType;
sql?: string;
collection?: string;
filter?: Record<string, any>;
projection?: Record<string, any>;
sort?: Record<string, any>;
limit?: number;
offset?: number;
parameters?: any[];
}
// 查询类型
enum QueryType {
SELECT = 'select',
INSERT = 'insert',
UPDATE = 'update',
DELETE = 'delete',
AGGREGATE = 'aggregate',
CUSTOM = 'custom'
}
// 查询结果
interface QueryResult {
rows?: any[];
columns?: ColumnInfo[];
totalCount?: number;
executionTime: number;
metadata?: Record<string, any>;
}
// 列信息
interface ColumnInfo {
name: string;
type: string;
nullable: boolean;
defaultValue?: any;
isPrimaryKey?: boolean;
isForeignKey?: boolean;
maxLength?: number;
}
// 命令请求
interface CommandRequest {
type: CommandType;
sql?: string;
script?: string;
parameters?: any[];
options?: Record<string, any>;
}
// 命令类型
enum CommandType {
DDL = 'ddl',
DML = 'dml',
SCRIPT = 'script',
PROCEDURE = 'procedure',
FUNCTION = 'function'
}
// 命令结果
interface CommandResult {
success: boolean;
affectedRows?: number;
insertId?: any;
executionTime: number;
message?: string;
warnings?: string[];
}
11.2.3 模式发现相关接口
// 模式信息
interface SchemaInfo {
database: string;
tables: TableInfo[];
views: ViewInfo[];
procedures: ProcedureInfo[];
functions: FunctionInfo[];
lastUpdated: Date;
}
// 表信息
interface TableInfo {
name: string;
schema?: string;
columns: ColumnInfo[];
primaryKeys: string[];
foreignKeys: ForeignKeyInfo[];
indexes: IndexInfo[];
constraints: ConstraintInfo[];
rowCount?: number;
size?: number;
comment?: string;
}
// 视图信息
interface ViewInfo {
name: string;
schema?: string;
columns: ColumnInfo[];
definition: string;
comment?: string;
}
// 存储过程信息
interface ProcedureInfo {
name: string;
schema?: string;
parameters: ParameterInfo[];
returnType?: string;
definition: string;
comment?: string;
}
// 函数信息
interface FunctionInfo {
name: string;
schema?: string;
parameters: ParameterInfo[];
returnType: string;
definition: string;
comment?: string;
}
// 参数信息
interface ParameterInfo {
name: string;
type: string;
direction: 'IN' | 'OUT' | 'INOUT';
defaultValue?: any;
nullable: boolean;
}
// 外键信息
interface ForeignKeyInfo {
name: string;
columns: string[];
referencedTable: string;
referencedColumns: string[];
onUpdate: string;
onDelete: string;
}
// 索引信息
interface IndexInfo {
name: string;
columns: string[];
isUnique: boolean;
isPrimary: boolean;
type: string;
}
// 约束信息
interface ConstraintInfo {
name: string;
type: ConstraintType;
columns: string[];
definition: string;
}
// 约束类型
enum ConstraintType {
PRIMARY_KEY = 'primary_key',
FOREIGN_KEY = 'foreign_key',
UNIQUE = 'unique',
CHECK = 'check',
NOT_NULL = 'not_null'
}
11.3 数据源适配器
11.3.1 数据源适配器接口
// 数据源适配器接口
interface DataSourceAdapter {
// 连接管理
testConnection(config: DataSourceConfig): Promise<ConnectionTestResult>;
createConnection(config: DataSourceConfig): Promise<Connection>;
createConnectionPool(config: DataSourceConfig): Promise<ConnectionPool>;
// 查询执行
executeQuery(connection: Connection, query: QueryRequest): Promise<QueryResult>;
executeCommand(connection: Connection, command: CommandRequest): Promise<CommandResult>;
// 模式发现
discoverSchema(connection: Connection): Promise<SchemaInfo>;
// 数据类型映射
mapDataType(nativeType: string): string;
formatValue(value: any, type: string): any;
// 查询构建
buildSelectQuery(table: string, options: SelectOptions): string;
buildInsertQuery(table: string, data: Record<string, any>): string;
buildUpdateQuery(table: string, data: Record<string, any>, where: Record<string, any>): string;
buildDeleteQuery(table: string, where: Record<string, any>): string;
}
// 查询选项
interface SelectOptions {
columns?: string[];
where?: Record<string, any>;
orderBy?: Record<string, 'ASC' | 'DESC'>;
limit?: number;
offset?: number;
joins?: JoinOptions[];
}
// 连接选项
interface JoinOptions {
type: 'INNER' | 'LEFT' | 'RIGHT' | 'FULL';
table: string;
on: string;
}
11.3.2 MySQL 适配器实现
// MySQL 数据源适配器
class MySQLDataSourceAdapter implements DataSourceAdapter {
async testConnection(config: DataSourceConfig): Promise<ConnectionTestResult> {
const startTime = Date.now();
try {
const mysql = require('mysql2/promise');
const connection = await mysql.createConnection({
host: config.host,
port: config.port || 3306,
user: config.username,
password: config.password,
database: config.database,
connectTimeout: 5000
});
const [rows] = await connection.execute('SELECT VERSION() as version');
await connection.end();
return {
success: true,
latency: Date.now() - startTime,
version: rows[0].version,
features: ['transactions', 'foreign_keys', 'stored_procedures']
};
} catch (error) {
return {
success: false,
error: error.message
};
}
}
async createConnection(config: DataSourceConfig): Promise<Connection> {
const mysql = require('mysql2/promise');
const nativeConnection = await mysql.createConnection({
host: config.host,
port: config.port || 3306,
user: config.username,
password: config.password,
database: config.database,
...config.options
});
return new MySQLConnection(generateId(), config.database!, nativeConnection);
}
async createConnectionPool(config: DataSourceConfig): Promise<ConnectionPool> {
const mysql = require('mysql2/promise');
const pool = mysql.createPool({
host: config.host,
port: config.port || 3306,
user: config.username,
password: config.password,
database: config.database,
connectionLimit: 10,
queueLimit: 0,
...config.options
});
return new MySQLConnectionPool(pool);
}
async executeQuery(connection: Connection, query: QueryRequest): Promise<QueryResult> {
const startTime = Date.now();
try {
let sql: string;
let params: any[] = [];
switch (query.type) {
case QueryType.SELECT:
sql = this.buildSelectQuery(query.collection!, {
columns: query.projection ? Object.keys(query.projection) : undefined,
where: query.filter,
limit: query.limit,
offset: query.offset
});
break;
case QueryType.CUSTOM:
sql = query.sql!;
params = query.parameters || [];
break;
default:
throw new Error(`不支持的查询类型: ${query.type}`);
}
const [rows, fields] = await connection.execute(sql, params);
const columns: ColumnInfo[] = fields.map((field: any) => ({
name: field.name,
type: this.mapDataType(field.type),
nullable: (field.flags & 1) === 0,
isPrimaryKey: (field.flags & 2) !== 0,
maxLength: field.length
}));
return {
rows: rows as any[],
columns,
executionTime: Date.now() - startTime
};
} catch (error) {
throw new Error(`查询执行失败: ${error.message}`);
}
}
async executeCommand(connection: Connection, command: CommandRequest): Promise<CommandResult> {
const startTime = Date.now();
try {
const result = await connection.execute(command.sql!, command.parameters || []);
return {
success: true,
affectedRows: result.affectedRows,
insertId: result.insertId,
executionTime: Date.now() - startTime
};
} catch (error) {
return {
success: false,
executionTime: Date.now() - startTime,
message: error.message
};
}
}
async discoverSchema(connection: Connection): Promise<SchemaInfo> {
const database = connection.dataSourceId;
// 获取表信息
const tables = await this.discoverTables(connection, database);
// 获取视图信息
const views = await this.discoverViews(connection, database);
// 获取存储过程信息
const procedures = await this.discoverProcedures(connection, database);
// 获取函数信息
const functions = await this.discoverFunctions(connection, database);
return {
database,
tables,
views,
procedures,
functions,
lastUpdated: new Date()
};
}
private async discoverTables(connection: Connection, database: string): Promise<TableInfo[]> {
const [tables] = await connection.execute(`
SELECT TABLE_NAME, TABLE_COMMENT, TABLE_ROWS, DATA_LENGTH
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = ? AND TABLE_TYPE = 'BASE TABLE'
`, [database]);
const tableInfos: TableInfo[] = [];
for (const table of tables as any[]) {
const columns = await this.discoverColumns(connection, database, table.TABLE_NAME);
const primaryKeys = await this.discoverPrimaryKeys(connection, database, table.TABLE_NAME);
const foreignKeys = await this.discoverForeignKeys(connection, database, table.TABLE_NAME);
const indexes = await this.discoverIndexes(connection, database, table.TABLE_NAME);
tableInfos.push({
name: table.TABLE_NAME,
schema: database,
columns,
primaryKeys,
foreignKeys,
indexes,
constraints: [],
rowCount: table.TABLE_ROWS,
size: table.DATA_LENGTH,
comment: table.TABLE_COMMENT
});
}
return tableInfos;
}
private async discoverColumns(connection: Connection, database: string, tableName: string): Promise<ColumnInfo[]> {
const [columns] = await connection.execute(`
SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COLUMN_DEFAULT,
CHARACTER_MAXIMUM_LENGTH, COLUMN_KEY, COLUMN_COMMENT
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?
ORDER BY ORDINAL_POSITION
`, [database, tableName]);
return (columns as any[]).map(col => ({
name: col.COLUMN_NAME,
type: this.mapDataType(col.DATA_TYPE),
nullable: col.IS_NULLABLE === 'YES',
defaultValue: col.COLUMN_DEFAULT,
isPrimaryKey: col.COLUMN_KEY === 'PRI',
isForeignKey: col.COLUMN_KEY === 'MUL',
maxLength: col.CHARACTER_MAXIMUM_LENGTH
}));
}
private async discoverPrimaryKeys(connection: Connection, database: string, tableName: string): Promise<string[]> {
const [keys] = await connection.execute(`
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND CONSTRAINT_NAME = 'PRIMARY'
ORDER BY ORDINAL_POSITION
`, [database, tableName]);
return (keys as any[]).map(key => key.COLUMN_NAME);
}
private async discoverForeignKeys(connection: Connection, database: string, tableName: string): Promise<ForeignKeyInfo[]> {
const [keys] = await connection.execute(`
SELECT CONSTRAINT_NAME, COLUMN_NAME, REFERENCED_TABLE_NAME,
REFERENCED_COLUMN_NAME, UPDATE_RULE, DELETE_RULE
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE kcu
JOIN INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS rc
ON kcu.CONSTRAINT_NAME = rc.CONSTRAINT_NAME
WHERE kcu.TABLE_SCHEMA = ? AND kcu.TABLE_NAME = ?
`, [database, tableName]);
const foreignKeys: ForeignKeyInfo[] = [];
const keyGroups = new Map<string, any[]>();
// 按约束名分组
for (const key of keys as any[]) {
if (!keyGroups.has(key.CONSTRAINT_NAME)) {
keyGroups.set(key.CONSTRAINT_NAME, []);
}
keyGroups.get(key.CONSTRAINT_NAME)!.push(key);
}
// 构建外键信息
for (const [constraintName, keyGroup] of keyGroups) {
foreignKeys.push({
name: constraintName,
columns: keyGroup.map(k => k.COLUMN_NAME),
referencedTable: keyGroup[0].REFERENCED_TABLE_NAME,
referencedColumns: keyGroup.map(k => k.REFERENCED_COLUMN_NAME),
onUpdate: keyGroup[0].UPDATE_RULE,
onDelete: keyGroup[0].DELETE_RULE
});
}
return foreignKeys;
}
private async discoverIndexes(connection: Connection, database: string, tableName: string): Promise<IndexInfo[]> {
const [indexes] = await connection.execute(`
SELECT INDEX_NAME, COLUMN_NAME, NON_UNIQUE, INDEX_TYPE
FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?
ORDER BY INDEX_NAME, SEQ_IN_INDEX
`, [database, tableName]);
const indexGroups = new Map<string, any[]>();
// 按索引名分组
for (const index of indexes as any[]) {
if (!indexGroups.has(index.INDEX_NAME)) {
indexGroups.set(index.INDEX_NAME, []);
}
indexGroups.get(index.INDEX_NAME)!.push(index);
}
const indexInfos: IndexInfo[] = [];
// 构建索引信息
for (const [indexName, indexGroup] of indexGroups) {
indexInfos.push({
name: indexName,
columns: indexGroup.map(i => i.COLUMN_NAME),
isUnique: indexGroup[0].NON_UNIQUE === 0,
isPrimary: indexName === 'PRIMARY',
type: indexGroup[0].INDEX_TYPE
});
}
return indexInfos;
}
private async discoverViews(connection: Connection, database: string): Promise<ViewInfo[]> {
const [views] = await connection.execute(`
SELECT TABLE_NAME, VIEW_DEFINITION
FROM INFORMATION_SCHEMA.VIEWS
WHERE TABLE_SCHEMA = ?
`, [database]);
const viewInfos: ViewInfo[] = [];
for (const view of views as any[]) {
const columns = await this.discoverColumns(connection, database, view.TABLE_NAME);
viewInfos.push({
name: view.TABLE_NAME,
schema: database,
columns,
definition: view.VIEW_DEFINITION
});
}
return viewInfos;
}
private async discoverProcedures(connection: Connection, database: string): Promise<ProcedureInfo[]> {
const [procedures] = await connection.execute(`
SELECT ROUTINE_NAME, ROUTINE_DEFINITION, ROUTINE_COMMENT
FROM INFORMATION_SCHEMA.ROUTINES
WHERE ROUTINE_SCHEMA = ? AND ROUTINE_TYPE = 'PROCEDURE'
`, [database]);
const procedureInfos: ProcedureInfo[] = [];
for (const proc of procedures as any[]) {
const parameters = await this.discoverParameters(connection, database, proc.ROUTINE_NAME);
procedureInfos.push({
name: proc.ROUTINE_NAME,
schema: database,
parameters,
definition: proc.ROUTINE_DEFINITION,
comment: proc.ROUTINE_COMMENT
});
}
return procedureInfos;
}
private async discoverFunctions(connection: Connection, database: string): Promise<FunctionInfo[]> {
const [functions] = await connection.execute(`
SELECT ROUTINE_NAME, ROUTINE_DEFINITION, ROUTINE_COMMENT, DATA_TYPE
FROM INFORMATION_SCHEMA.ROUTINES
WHERE ROUTINE_SCHEMA = ? AND ROUTINE_TYPE = 'FUNCTION'
`, [database]);
const functionInfos: FunctionInfo[] = [];
for (const func of functions as any[]) {
const parameters = await this.discoverParameters(connection, database, func.ROUTINE_NAME);
functionInfos.push({
name: func.ROUTINE_NAME,
schema: database,
parameters,
returnType: this.mapDataType(func.DATA_TYPE),
definition: func.ROUTINE_DEFINITION,
comment: func.ROUTINE_COMMENT
});
}
return functionInfos;
}
private async discoverParameters(connection: Connection, database: string, routineName: string): Promise<ParameterInfo[]> {
const [parameters] = await connection.execute(`
SELECT PARAMETER_NAME, DATA_TYPE, PARAMETER_MODE
FROM INFORMATION_SCHEMA.PARAMETERS
WHERE SPECIFIC_SCHEMA = ? AND SPECIFIC_NAME = ?
ORDER BY ORDINAL_POSITION
`, [database, routineName]);
return (parameters as any[]).map(param => ({
name: param.PARAMETER_NAME,
type: this.mapDataType(param.DATA_TYPE),
direction: param.PARAMETER_MODE as 'IN' | 'OUT' | 'INOUT',
nullable: true
}));
}
mapDataType(nativeType: string): string {
const typeMap: Record<string, string> = {
'varchar': 'string',
'char': 'string',
'text': 'string',
'longtext': 'string',
'int': 'number',
'bigint': 'number',
'decimal': 'number',
'float': 'number',
'double': 'number',
'boolean': 'boolean',
'tinyint': 'boolean',
'date': 'date',
'datetime': 'datetime',
'timestamp': 'datetime',
'json': 'object',
'blob': 'binary'
};
return typeMap[nativeType.toLowerCase()] || 'string';
}
formatValue(value: any, type: string): any {
switch (type) {
case 'date':
case 'datetime':
return value instanceof Date ? value.toISOString() : value;
case 'boolean':
return Boolean(value);
case 'number':
return Number(value);
case 'object':
return typeof value === 'string' ? JSON.parse(value) : value;
default:
return value;
}
}
buildSelectQuery(table: string, options: SelectOptions): string {
let sql = 'SELECT ';
// 列
if (options.columns && options.columns.length > 0) {
sql += options.columns.map(col => `\`${col}\``).join(', ');
} else {
sql += '*';
}
sql += ` FROM \`${table}\``;
// JOIN
if (options.joins) {
for (const join of options.joins) {
sql += ` ${join.type} JOIN \`${join.table}\` ON ${join.on}`;
}
}
// WHERE
if (options.where) {
const conditions = Object.entries(options.where)
.map(([key, value]) => `\`${key}\` = ${this.formatSQLValue(value)}`)
.join(' AND ');
sql += ` WHERE ${conditions}`;
}
// ORDER BY
if (options.orderBy) {
const orderClauses = Object.entries(options.orderBy)
.map(([key, direction]) => `\`${key}\` ${direction}`)
.join(', ');
sql += ` ORDER BY ${orderClauses}`;
}
// LIMIT
if (options.limit) {
sql += ` LIMIT ${options.limit}`;
if (options.offset) {
sql += ` OFFSET ${options.offset}`;
}
}
return sql;
}
buildInsertQuery(table: string, data: Record<string, any>): string {
const columns = Object.keys(data).map(col => `\`${col}\``).join(', ');
const values = Object.values(data).map(val => this.formatSQLValue(val)).join(', ');
return `INSERT INTO \`${table}\` (${columns}) VALUES (${values})`;
}
buildUpdateQuery(table: string, data: Record<string, any>, where: Record<string, any>): string {
const setClauses = Object.entries(data)
.map(([key, value]) => `\`${key}\` = ${this.formatSQLValue(value)}`)
.join(', ');
const whereConditions = Object.entries(where)
.map(([key, value]) => `\`${key}\` = ${this.formatSQLValue(value)}`)
.join(' AND ');
return `UPDATE \`${table}\` SET ${setClauses} WHERE ${whereConditions}`;
}
buildDeleteQuery(table: string, where: Record<string, any>): string {
const whereConditions = Object.entries(where)
.map(([key, value]) => `\`${key}\` = ${this.formatSQLValue(value)}`)
.join(' AND ');
return `DELETE FROM \`${table}\` WHERE ${whereConditions}`;
}
private formatSQLValue(value: any): string {
if (value === null || value === undefined) {
return 'NULL';
}
if (typeof value === 'string') {
return `'${value.replace(/'/g, "''")}'`;
}
if (typeof value === 'boolean') {
return value ? '1' : '0';
}
if (value instanceof Date) {
return `'${value.toISOString().slice(0, 19).replace('T', ' ')}'`;
}
return String(value);
}
}
// MySQL 连接实现
class MySQLConnection implements Connection {
constructor(
public id: string,
public dataSourceId: string,
private nativeConnection: any
) {}
isConnected(): boolean {
return this.nativeConnection && !this.nativeConnection.destroyed;
}
async close(): Promise<void> {
if (this.nativeConnection) {
await this.nativeConnection.end();
}
}
async execute(query: string, params?: any[]): Promise<any> {
return this.nativeConnection.execute(query, params);
}
}
// MySQL 连接池实现
class MySQLConnectionPool implements ConnectionPool {
constructor(private pool: any) {}
async getConnection(): Promise<Connection> {
const nativeConnection = await this.pool.getConnection();
return new MySQLConnection(generateId(), '', nativeConnection);
}
async releaseConnection(connection: Connection): Promise<void> {
const mysqlConnection = connection as MySQLConnection;
if (mysqlConnection.nativeConnection) {
mysqlConnection.nativeConnection.release();
}
}
async close(): Promise<void> {
await this.pool.end();
}
getStats(): PoolStats {
return {
totalConnections: this.pool.config.connectionLimit,
activeConnections: this.pool._allConnections.length,
idleConnections: this.pool._freeConnections.length,
waitingRequests: this.pool._connectionQueue.length
};
}
}
11.4.2 数据模型相关接口
// 模型定义
interface ModelDefinition {
name: string;
displayName?: string;
description?: string;
fields: FieldDefinition[];
relations?: RelationDefinition[];
indexes?: IndexDefinition[];
constraints?: ConstraintDefinition[];
options?: ModelOptions;
metadata?: Record<string, any>;
}
// 数据模型
interface DataModel extends ModelDefinition {
id: string;
version: number;
status: ModelStatus;
createdAt: Date;
updatedAt: Date;
}
// 模型状态
enum ModelStatus {
DRAFT = 'draft',
PUBLISHED = 'published',
DEPRECATED = 'deprecated'
}
// 字段定义
interface FieldDefinition {
id?: string;
name: string;
displayName?: string;
type: FieldType;
required: boolean;
unique?: boolean;
defaultValue?: any;
validation?: FieldValidation;
options?: FieldOptions;
description?: string;
}
// 字段类型
enum FieldType {
STRING = 'string',
NUMBER = 'number',
BOOLEAN = 'boolean',
DATE = 'date',
DATETIME = 'datetime',
TIME = 'time',
EMAIL = 'email',
URL = 'url',
PHONE = 'phone',
JSON = 'json',
TEXT = 'text',
BINARY = 'binary',
ENUM = 'enum',
ARRAY = 'array',
OBJECT = 'object',
REFERENCE = 'reference'
}
// 字段验证
interface FieldValidation {
minLength?: number;
maxLength?: number;
min?: number;
max?: number;
pattern?: string;
enum?: any[];
custom?: string;
}
// 字段选项
interface FieldOptions {
autoIncrement?: boolean;
precision?: number;
scale?: number;
format?: string;
enumValues?: EnumValue[];
referenceModel?: string;
referenceField?: string;
cascadeDelete?: boolean;
}
// 枚举值
interface EnumValue {
value: any;
label: string;
description?: string;
}
// 关系定义
interface RelationDefinition {
id?: string;
name: string;
type: RelationType;
sourceField: string;
targetModel: string;
targetField: string;
cascadeDelete?: boolean;
cascadeUpdate?: boolean;
description?: string;
}
// 关系类型
enum RelationType {
ONE_TO_ONE = 'one_to_one',
ONE_TO_MANY = 'one_to_many',
MANY_TO_ONE = 'many_to_one',
MANY_TO_MANY = 'many_to_many'
}
// 索引定义
interface IndexDefinition {
id?: string;
name: string;
fields: string[];
unique?: boolean;
type?: IndexType;
options?: Record<string, any>;
}
// 索引类型
enum IndexType {
BTREE = 'btree',
HASH = 'hash',
FULLTEXT = 'fulltext',
SPATIAL = 'spatial'
}
// 约束定义
interface ConstraintDefinition {
id?: string;
name: string;
type: ConstraintType;
fields: string[];
expression?: string;
options?: Record<string, any>;
}
// 模型选项
interface ModelOptions {
tableName?: string;
timestamps?: boolean;
softDelete?: boolean;
versioning?: boolean;
auditing?: boolean;
caching?: boolean;
[key: string]: any;
}
// 模型查询
interface ModelQuery {
status?: ModelStatus;
search?: string;
tags?: string[];
sortBy?: string;
sortOrder?: 'asc' | 'desc';
limit?: number;
offset?: number;
}
// 数据模型管理器选项
interface DataModelManagerOptions {
eventBus?: EventBus;
dataSourceManager: DataSourceManager;
auditLogger: AuditLogger;
}
// 验证结果
interface ValidationResult {
isValid: boolean;
errors: string[];
warnings: string[];
}
// 同步结果
interface SyncResult {
success: boolean;
error?: string;
schema?: string;
executionTime?: number;
}
11.4.3 模式生成器
// 模式生成器接口
interface SchemaGenerator {
generate(model: DataModel): string;
generateField(field: FieldDefinition): string;
generateIndex(index: IndexDefinition): string;
generateConstraint(constraint: ConstraintDefinition): string;
}
// MySQL 模式生成器
class MySQLSchemaGenerator implements SchemaGenerator {
generate(model: DataModel): string {
const tableName = model.options.tableName || model.name;
let sql = `CREATE TABLE \`${tableName}\` (\n`;
// 生成字段定义
const fieldDefinitions = model.fields.map(field =>
' ' + this.generateField(field)
);
sql += fieldDefinitions.join(',\n');
// 生成主键
const primaryKeyFields = model.fields.filter(f => f.options?.autoIncrement || f.name === 'id');
if (primaryKeyFields.length > 0) {
sql += `,\n PRIMARY KEY (${primaryKeyFields.map(f => `\`${f.name}\``).join(', ')})`;
}
// 生成外键
const foreignKeys = model.relations.filter(r => r.type === RelationType.MANY_TO_ONE);
for (const fk of foreignKeys) {
sql += `,\n FOREIGN KEY (\`${fk.sourceField}\`) REFERENCES \`${fk.targetModel}\`(\`${fk.targetField}\`)`;
if (fk.cascadeDelete) {
sql += ' ON DELETE CASCADE';
}
if (fk.cascadeUpdate) {
sql += ' ON UPDATE CASCADE';
}
}
sql += '\n)';
// 添加表选项
sql += ' ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci';
if (model.description) {
sql += ` COMMENT='${model.description.replace(/'/g, "''")}'`;
}
sql += ';\n\n';
// 生成索引
for (const index of model.indexes) {
sql += this.generateIndex(index) + ';\n';
}
return sql;
}
generateField(field: FieldDefinition): string {
let sql = `\`${field.name}\` `;
// 数据类型
switch (field.type) {
case FieldType.STRING:
const maxLength = field.validation?.maxLength || 255;
sql += `VARCHAR(${maxLength})`;
break;
case FieldType.TEXT:
sql += 'TEXT';
break;
case FieldType.NUMBER:
if (field.options?.precision && field.options?.scale) {
sql += `DECIMAL(${field.options.precision}, ${field.options.scale})`;
} else {
sql += 'INT';
}
break;
case FieldType.BOOLEAN:
sql += 'BOOLEAN';
break;
case FieldType.DATE:
sql += 'DATE';
break;
case FieldType.DATETIME:
sql += 'DATETIME';
break;
case FieldType.TIME:
sql += 'TIME';
break;
case FieldType.EMAIL:
case FieldType.URL:
case FieldType.PHONE:
sql += 'VARCHAR(255)';
break;
case FieldType.JSON:
sql += 'JSON';
break;
case FieldType.BINARY:
sql += 'BLOB';
break;
case FieldType.ENUM:
if (field.options?.enumValues) {
const values = field.options.enumValues
.map(ev => `'${ev.value.toString().replace(/'/g, "''")}'`)
.join(', ');
sql += `ENUM(${values})`;
} else {
sql += 'VARCHAR(255)';
}
break;
case FieldType.REFERENCE:
sql += 'INT';
break;
default:
sql += 'VARCHAR(255)';
}
// 自增
if (field.options?.autoIncrement) {
sql += ' AUTO_INCREMENT';
}
// 非空
if (field.required) {
sql += ' NOT NULL';
} else {
sql += ' NULL';
}
// 默认值
if (field.defaultValue !== undefined) {
if (field.type === FieldType.STRING || field.type === FieldType.TEXT) {
sql += ` DEFAULT '${field.defaultValue.toString().replace(/'/g, "''")}'`;
} else if (field.type === FieldType.BOOLEAN) {
sql += ` DEFAULT ${field.defaultValue ? 1 : 0}`;
} else {
sql += ` DEFAULT ${field.defaultValue}`;
}
}
// 唯一约束
if (field.unique) {
sql += ' UNIQUE';
}
// 注释
if (field.description) {
sql += ` COMMENT '${field.description.replace(/'/g, "''")}'`;
}
return sql;
}
generateIndex(index: IndexDefinition): string {
let sql = 'CREATE ';
if (index.unique) {
sql += 'UNIQUE ';
}
if (index.type === IndexType.FULLTEXT) {
sql += 'FULLTEXT ';
} else if (index.type === IndexType.SPATIAL) {
sql += 'SPATIAL ';
}
sql += `INDEX \`${index.name}\` `;
sql += `(${index.fields.map(f => `\`${f}\``).join(', ')})`;
if (index.type === IndexType.HASH) {
sql += ' USING HASH';
} else if (index.type === IndexType.BTREE) {
sql += ' USING BTREE';
}
return sql;
}
generateConstraint(constraint: ConstraintDefinition): string {
let sql = `CONSTRAINT \`${constraint.name}\` `;
switch (constraint.type) {
case ConstraintType.CHECK:
sql += `CHECK (${constraint.expression})`;
break;
case ConstraintType.UNIQUE:
sql += `UNIQUE (${constraint.fields.map(f => `\`${f}\``).join(', ')})`;
break;
default:
throw new Error(`不支持的约束类型: ${constraint.type}`);
}
return sql;
}
}
// JSON Schema 生成器
class JSONSchemaGenerator implements SchemaGenerator {
generate(model: DataModel): string {
const schema = {
$schema: 'http://json-schema.org/draft-07/schema#',
type: 'object',
title: model.displayName || model.name,
description: model.description,
properties: {} as Record<string, any>,
required: [] as string[]
};
// 生成字段属性
for (const field of model.fields) {
schema.properties[field.name] = this.generateFieldSchema(field);
if (field.required) {
schema.required.push(field.name);
}
}
return JSON.stringify(schema, null, 2);
}
private generateFieldSchema(field: FieldDefinition): any {
const schema: any = {
title: field.displayName || field.name,
description: field.description
};
switch (field.type) {
case FieldType.STRING:
case FieldType.EMAIL:
case FieldType.URL:
case FieldType.PHONE:
schema.type = 'string';
if (field.validation?.minLength) schema.minLength = field.validation.minLength;
if (field.validation?.maxLength) schema.maxLength = field.validation.maxLength;
if (field.validation?.pattern) schema.pattern = field.validation.pattern;
break;
case FieldType.TEXT:
schema.type = 'string';
break;
case FieldType.NUMBER:
schema.type = 'number';
if (field.validation?.min !== undefined) schema.minimum = field.validation.min;
if (field.validation?.max !== undefined) schema.maximum = field.validation.max;
break;
case FieldType.BOOLEAN:
schema.type = 'boolean';
break;
case FieldType.DATE:
case FieldType.DATETIME:
case FieldType.TIME:
schema.type = 'string';
schema.format = field.type;
break;
case FieldType.ENUM:
schema.type = 'string';
if (field.options?.enumValues) {
schema.enum = field.options.enumValues.map(ev => ev.value);
}
break;
case FieldType.ARRAY:
schema.type = 'array';
break;
case FieldType.OBJECT:
case FieldType.JSON:
schema.type = 'object';
break;
default:
schema.type = 'string';
}
if (field.defaultValue !== undefined) {
schema.default = field.defaultValue;
}
return schema;
}
generateField(field: FieldDefinition): string {
return JSON.stringify(this.generateFieldSchema(field), null, 2);
}
generateIndex(index: IndexDefinition): string {
return JSON.stringify({
name: index.name,
fields: index.fields,
unique: index.unique,
type: index.type
}, null, 2);
}
generateConstraint(constraint: ConstraintDefinition): string {
return JSON.stringify({
name: constraint.name,
type: constraint.type,
fields: constraint.fields,
expression: constraint.expression
}, null, 2);
}
}
// TypeScript 接口生成器
class TypeScriptSchemaGenerator implements SchemaGenerator {
generate(model: DataModel): string {
let ts = `// ${model.displayName || model.name}\n`;
if (model.description) {
ts += `// ${model.description}\n`;
}
ts += `interface ${this.toPascalCase(model.name)} {\n`;
// 生成字段
for (const field of model.fields) {
ts += ' ' + this.generateField(field) + '\n';
}
ts += '}\n\n';
// 生成创建接口
ts += `interface Create${this.toPascalCase(model.name)} {\n`;
for (const field of model.fields) {
if (!field.options?.autoIncrement) {
ts += ' ' + this.generateField(field) + '\n';
}
}
ts += '}\n\n';
// 生成更新接口
ts += `interface Update${this.toPascalCase(model.name)} {\n`;
for (const field of model.fields) {
if (!field.options?.autoIncrement) {
const fieldDef = this.generateField(field).replace(':', '?:');
ts += ' ' + fieldDef + '\n';
}
}
ts += '}\n';
return ts;
}
generateField(field: FieldDefinition): string {
let ts = '';
// 注释
if (field.description) {
ts += `// ${field.description}\n `;
}
ts += field.name;
// 可选标记
if (!field.required) {
ts += '?';
}
ts += ': ';
// 类型
switch (field.type) {
case FieldType.STRING:
case FieldType.TEXT:
case FieldType.EMAIL:
case FieldType.URL:
case FieldType.PHONE:
ts += 'string';
break;
case FieldType.NUMBER:
ts += 'number';
break;
case FieldType.BOOLEAN:
ts += 'boolean';
break;
case FieldType.DATE:
case FieldType.DATETIME:
case FieldType.TIME:
ts += 'Date';
break;
case FieldType.ENUM:
if (field.options?.enumValues) {
const values = field.options.enumValues
.map(ev => `'${ev.value}'`)
.join(' | ');
ts += values;
} else {
ts += 'string';
}
break;
case FieldType.ARRAY:
ts += 'any[]';
break;
case FieldType.OBJECT:
case FieldType.JSON:
ts += 'Record<string, any>';
break;
case FieldType.REFERENCE:
if (field.options?.referenceModel) {
ts += this.toPascalCase(field.options.referenceModel);
} else {
ts += 'string';
}
break;
default:
ts += 'any';
}
ts += ';';
return ts;
}
generateIndex(index: IndexDefinition): string {
return `// Index: ${index.name} on [${index.fields.join(', ')}]${index.unique ? ' (unique)' : ''}`;
}
generateConstraint(constraint: ConstraintDefinition): string {
return `// Constraint: ${constraint.name} (${constraint.type}) on [${constraint.fields.join(', ')}]`;
}
private toPascalCase(str: string): string {
return str.replace(/(?:^|_)([a-z])/g, (_, char) => char.toUpperCase());
}
}
11.5 使用示例
11.5.1 数据源管理与数据建模完整示例
// 数据源管理与数据建模示例
class LowCodeDataManagementDemo {
private dataSourceManager: DataSourceManager;
private dataModelManager: DataModelManager;
private eventBus: EventBus;
constructor() {
this.eventBus = new EventBus();
this.dataSourceManager = new LowCodeDataSourceManager({
eventBus: this.eventBus,
auditLogger: new AuditLogger()
});
this.dataModelManager = new LowCodeDataModelManager({
eventBus: this.eventBus,
dataSourceManager: this.dataSourceManager,
auditLogger: new AuditLogger()
});
this.setupEventListeners();
}
// 设置事件监听器
private setupEventListeners(): void {
this.eventBus.on('dataSource.connected', (event) => {
console.log('数据源连接成功:', event.dataSourceId);
});
this.eventBus.on('dataSource.error', (event) => {
console.error('数据源错误:', event.error);
});
this.eventBus.on('model.created', (event) => {
console.log('数据模型创建成功:', event.modelId);
});
this.eventBus.on('model.synced', (event) => {
console.log('模型同步到数据源成功:', event.modelId);
});
}
// 演示数据源管理
async demonstrateDataSourceManagement(): Promise<void> {
console.log('=== 数据源管理演示 ===');
// 1. 创建MySQL数据源
const mysqlConfig: DataSourceConfig = {
id: 'mysql-main',
name: 'MySQL主数据库',
type: DataSourceType.MYSQL,
host: 'localhost',
port: 3306,
database: 'lowcode_platform',
username: 'root',
password: 'password',
options: {
charset: 'utf8mb4',
timezone: '+08:00',
acquireTimeout: 60000,
timeout: 60000
}
};
const mysqlDataSource = await this.dataSourceManager.createDataSource(mysqlConfig);
console.log('MySQL数据源创建成功:', mysqlDataSource.id);
// 2. 测试连接
const testResult = await this.dataSourceManager.testConnection(mysqlDataSource.id);
console.log('连接测试结果:', testResult);
// 3. 发现数据库模式
const schemaInfo = await this.dataSourceManager.discoverSchema(mysqlDataSource.id);
console.log('数据库模式信息:', {
tables: schemaInfo.tables.length,
views: schemaInfo.views.length,
procedures: schemaInfo.procedures.length
});
// 4. 创建PostgreSQL数据源
const postgresConfig: DataSourceConfig = {
id: 'postgres-analytics',
name: 'PostgreSQL分析数据库',
type: DataSourceType.POSTGRESQL,
host: 'localhost',
port: 5432,
database: 'analytics',
username: 'postgres',
password: 'password',
options: {
ssl: false,
max: 20,
idleTimeoutMillis: 30000
}
};
const postgresDataSource = await this.dataSourceManager.createDataSource(postgresConfig);
console.log('PostgreSQL数据源创建成功:', postgresDataSource.id);
// 5. 查询数据源列表
const dataSources = await this.dataSourceManager.getDataSources();
console.log('数据源列表:', dataSources.map(ds => ({ id: ds.id, name: ds.name, type: ds.type })));
}
// 演示数据建模
async demonstrateDataModeling(): Promise<void> {
console.log('\n=== 数据建模演示 ===');
// 1. 创建用户模型
const userModel: ModelDefinition = {
name: 'user',
displayName: '用户',
description: '系统用户信息',
fields: [
{
name: 'id',
displayName: 'ID',
type: FieldType.NUMBER,
required: true,
options: { autoIncrement: true },
description: '用户唯一标识'
},
{
name: 'username',
displayName: '用户名',
type: FieldType.STRING,
required: true,
unique: true,
validation: { minLength: 3, maxLength: 50, pattern: '^[a-zA-Z0-9_]+$' },
description: '用户登录名'
},
{
name: 'email',
displayName: '邮箱',
type: FieldType.EMAIL,
required: true,
unique: true,
validation: { maxLength: 255 },
description: '用户邮箱地址'
},
{
name: 'password',
displayName: '密码',
type: FieldType.STRING,
required: true,
validation: { minLength: 8, maxLength: 255 },
description: '用户密码(加密存储)'
},
{
name: 'status',
displayName: '状态',
type: FieldType.ENUM,
required: true,
defaultValue: 'active',
options: {
enumValues: [
{ value: 'active', label: '激活', description: '用户账户正常' },
{ value: 'inactive', label: '未激活', description: '用户账户未激活' },
{ value: 'suspended', label: '暂停', description: '用户账户被暂停' }
]
},
description: '用户账户状态'
},
{
name: 'profile',
displayName: '个人资料',
type: FieldType.JSON,
required: false,
description: '用户个人资料信息'
},
{
name: 'createdAt',
displayName: '创建时间',
type: FieldType.DATETIME,
required: true,
defaultValue: 'CURRENT_TIMESTAMP',
description: '账户创建时间'
},
{
name: 'updatedAt',
displayName: '更新时间',
type: FieldType.DATETIME,
required: true,
defaultValue: 'CURRENT_TIMESTAMP',
description: '账户最后更新时间'
}
],
indexes: [
{
name: 'idx_user_username',
fields: ['username'],
unique: true
},
{
name: 'idx_user_email',
fields: ['email'],
unique: true
},
{
name: 'idx_user_status',
fields: ['status']
}
],
options: {
tableName: 'users',
timestamps: true,
softDelete: false,
auditing: true
}
};
const createdUserModel = await this.dataModelManager.createModel(userModel);
console.log('用户模型创建成功:', createdUserModel.id);
// 2. 创建文章模型
const articleModel: ModelDefinition = {
name: 'article',
displayName: '文章',
description: '博客文章信息',
fields: [
{
name: 'id',
displayName: 'ID',
type: FieldType.NUMBER,
required: true,
options: { autoIncrement: true }
},
{
name: 'title',
displayName: '标题',
type: FieldType.STRING,
required: true,
validation: { minLength: 1, maxLength: 200 }
},
{
name: 'content',
displayName: '内容',
type: FieldType.TEXT,
required: true
},
{
name: 'authorId',
displayName: '作者ID',
type: FieldType.REFERENCE,
required: true,
options: {
referenceModel: 'user',
referenceField: 'id',
cascadeDelete: false
}
},
{
name: 'status',
displayName: '状态',
type: FieldType.ENUM,
required: true,
defaultValue: 'draft',
options: {
enumValues: [
{ value: 'draft', label: '草稿' },
{ value: 'published', label: '已发布' },
{ value: 'archived', label: '已归档' }
]
}
},
{
name: 'publishedAt',
displayName: '发布时间',
type: FieldType.DATETIME,
required: false
},
{
name: 'createdAt',
displayName: '创建时间',
type: FieldType.DATETIME,
required: true,
defaultValue: 'CURRENT_TIMESTAMP'
},
{
name: 'updatedAt',
displayName: '更新时间',
type: FieldType.DATETIME,
required: true,
defaultValue: 'CURRENT_TIMESTAMP'
}
],
relations: [
{
name: 'author',
type: RelationType.MANY_TO_ONE,
sourceField: 'authorId',
targetModel: 'user',
targetField: 'id',
cascadeDelete: false
}
],
indexes: [
{
name: 'idx_article_author',
fields: ['authorId']
},
{
name: 'idx_article_status',
fields: ['status']
},
{
name: 'idx_article_published',
fields: ['publishedAt']
}
],
options: {
tableName: 'articles',
timestamps: true
}
};
const createdArticleModel = await this.dataModelManager.createModel(articleModel);
console.log('文章模型创建成功:', createdArticleModel.id);
// 3. 验证模型
const userValidation = await this.dataModelManager.validateModel(createdUserModel.id);
console.log('用户模型验证结果:', userValidation);
const articleValidation = await this.dataModelManager.validateModel(createdArticleModel.id);
console.log('文章模型验证结果:', articleValidation);
// 4. 生成数据库模式
const userSchema = await this.dataModelManager.generateSchema(createdUserModel.id, 'mysql');
console.log('用户模型MySQL模式:\n', userSchema);
const articleSchema = await this.dataModelManager.generateSchema(createdArticleModel.id, 'mysql');
console.log('文章模型MySQL模式:\n', articleSchema);
// 5. 生成TypeScript接口
const userTypeScript = await this.dataModelManager.generateSchema(createdUserModel.id, 'typescript');
console.log('用户模型TypeScript接口:\n', userTypeScript);
// 6. 同步到数据源
const syncResult = await this.dataModelManager.syncToDataSource(
createdUserModel.id,
'mysql-main'
);
console.log('用户模型同步结果:', syncResult);
}
// 演示数据操作
async demonstrateDataOperations(): Promise<void> {
console.log('\n=== 数据操作演示 ===');
// 1. 插入用户数据
const insertUserResult = await this.dataSourceManager.executeCommand('mysql-main', {
type: 'insert',
table: 'users',
data: {
username: 'john_doe',
email: 'john@example.com',
password: 'hashed_password_123',
status: 'active',
profile: JSON.stringify({
firstName: 'John',
lastName: 'Doe',
age: 30
})
}
});
console.log('插入用户结果:', insertUserResult);
// 2. 查询用户数据
const queryUserResult = await this.dataSourceManager.executeQuery('mysql-main', {
sql: 'SELECT * FROM users WHERE username = ?',
parameters: ['john_doe']
});
console.log('查询用户结果:', queryUserResult);
// 3. 插入文章数据
const insertArticleResult = await this.dataSourceManager.executeCommand('mysql-main', {
type: 'insert',
table: 'articles',
data: {
title: '我的第一篇文章',
content: '这是文章的内容...',
authorId: 1,
status: 'published',
publishedAt: new Date().toISOString()
}
});
console.log('插入文章结果:', insertArticleResult);
// 4. 关联查询
const joinQueryResult = await this.dataSourceManager.executeQuery('mysql-main', {
sql: `
SELECT a.*, u.username as author_name
FROM articles a
JOIN users u ON a.authorId = u.id
WHERE a.status = ?
`,
parameters: ['published']
});
console.log('关联查询结果:', joinQueryResult);
}
// 演示模型管理
async demonstrateModelManagement(): Promise<void> {
console.log('\n=== 模型管理演示 ===');
// 1. 查询模型列表
const models = await this.dataModelManager.getModels();
console.log('模型列表:', models.map(m => ({ id: m.id, name: m.name, status: m.status })));
// 2. 获取特定模型
const userModel = await this.dataModelManager.getModel('user');
if (userModel) {
console.log('用户模型详情:', {
name: userModel.name,
fieldsCount: userModel.fields.length,
indexesCount: userModel.indexes?.length || 0
});
}
// 3. 更新模型(添加新字段)
if (userModel) {
const updatedModel = {
...userModel,
fields: [
...userModel.fields,
{
name: 'lastLoginAt',
displayName: '最后登录时间',
type: FieldType.DATETIME,
required: false,
description: '用户最后一次登录时间'
}
]
};
const updateResult = await this.dataModelManager.updateModel(userModel.id, updatedModel);
console.log('模型更新成功:', updateResult.id);
}
// 4. 复制模型
const copiedModel = await this.dataModelManager.copyModel('user', 'admin_user');
console.log('模型复制成功:', copiedModel.id);
// 5. 导出模型
const exportedModel = await this.dataModelManager.exportModel('user');
console.log('模型导出成功,字段数量:', exportedModel.fields.length);
}
// 运行完整演示
async runDemo(): Promise<void> {
try {
await this.demonstrateDataSourceManagement();
await this.demonstrateDataModeling();
await this.demonstrateDataOperations();
await this.demonstrateModelManagement();
console.log('\n=== 演示完成 ===');
} catch (error) {
console.error('演示过程中发生错误:', error);
}
}
}
// 使用示例
const demo = new LowCodeDataManagementDemo();
demo.runDemo();
11.5.2 Web API 集成示例
// Express.js 集成示例
import express from 'express';
import { DataSourceManager, DataModelManager } from './data-management';
const app = express();
app.use(express.json());
// 初始化管理器
const dataSourceManager = new LowCodeDataSourceManager();
const dataModelManager = new LowCodeDataModelManager({
dataSourceManager,
eventBus: new EventBus(),
auditLogger: new AuditLogger()
});
// 数据源管理 API
app.get('/api/datasources', async (req, res) => {
try {
const dataSources = await dataSourceManager.getDataSources();
res.json(dataSources);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.post('/api/datasources', async (req, res) => {
try {
const dataSource = await dataSourceManager.createDataSource(req.body);
res.status(201).json(dataSource);
} catch (error) {
res.status(400).json({ error: error.message });
}
});
app.post('/api/datasources/:id/test', async (req, res) => {
try {
const result = await dataSourceManager.testConnection(req.params.id);
res.json(result);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// 数据模型管理 API
app.get('/api/models', async (req, res) => {
try {
const models = await dataModelManager.getModels(req.query);
res.json(models);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.post('/api/models', async (req, res) => {
try {
const model = await dataModelManager.createModel(req.body);
res.status(201).json(model);
} catch (error) {
res.status(400).json({ error: error.message });
}
});
app.get('/api/models/:id/schema/:type', async (req, res) => {
try {
const schema = await dataModelManager.generateSchema(
req.params.id,
req.params.type
);
res.text(schema);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.post('/api/models/:id/sync/:dataSourceId', async (req, res) => {
try {
const result = await dataModelManager.syncToDataSource(
req.params.id,
req.params.dataSourceId
);
res.json(result);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
// 数据查询 API
app.post('/api/datasources/:id/query', async (req, res) => {
try {
const result = await dataSourceManager.executeQuery(
req.params.id,
req.body
);
res.json(result);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.post('/api/datasources/:id/command', async (req, res) => {
try {
const result = await dataSourceManager.executeCommand(
req.params.id,
req.body
);
res.json(result);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.listen(3000, () => {
console.log('数据管理API服务器启动在端口 3000');
});
11.6 小结
本章详细介绍了低代码平台的数据源管理与数据建模系统,这是平台数据层的核心基础设施。
11.6.1 核心要点回顾
数据源管理架构
- 统一的数据源管理接口
- 多种数据库类型支持
- 连接池和连接管理
- 数据源适配器模式
数据建模系统
- 可视化数据模型定义
- 丰富的字段类型支持
- 关系和约束管理
- 模型验证和同步
模式生成器
- 多种目标格式支持
- MySQL/PostgreSQL DDL生成
- JSON Schema生成
- TypeScript接口生成
数据操作
- 统一的查询和命令接口
- 参数化查询支持
- 事务管理
- 错误处理和重试
模型管理
- 模型版本控制
- 模型导入导出
- 模型复制和模板
- 模型同步到数据源
11.6.2 技术特色
- 类型安全:完整的TypeScript类型定义,编译时错误检查
- 适配器模式:支持多种数据库类型的统一接口
- 事件驱动:完整的事件系统,支持实时监控和响应
- 可扩展性:插件化的适配器和生成器架构
- 高性能:连接池管理和查询优化
- 安全性:参数化查询防止SQL注入
11.6.3 应用场景
- 企业数据集成:连接多种企业数据源
- 快速原型开发:可视化数据模型设计
- 数据迁移:跨数据库的模式转换
- API开发:自动生成数据访问接口
- 报表系统:统一的数据查询接口
11.6.4 最佳实践
- 连接管理:合理配置连接池参数
- 模型设计:遵循数据库设计范式
- 性能优化:合理使用索引和查询优化
- 安全考虑:使用参数化查询和权限控制
- 监控告警:实时监控数据源状态和性能
下一章我们将学习工作流引擎与自动化,了解如何在低代码平台中实现业务流程自动化和工作流管理。
11.4.1 数据模型管理器
// 数据模型管理器接口
interface DataModelManager {
// 模型管理
createModel(definition: ModelDefinition): Promise<DataModel>;
updateModel(id: string, definition: Partial<ModelDefinition>): Promise<DataModel>;
deleteModel(id: string): Promise<void>;
getModel(id: string): Promise<DataModel | null>;
queryModels(query: ModelQuery): Promise<DataModel[]>;
// 字段管理
addField(modelId: string, field: FieldDefinition): Promise<DataModel>;
updateField(modelId: string, fieldId: string, field: Partial<FieldDefinition>): Promise<DataModel>;
removeField(modelId: string, fieldId: string): Promise<DataModel>;
// 关系管理
addRelation(modelId: string, relation: RelationDefinition): Promise<DataModel>;
updateRelation(modelId: string, relationId: string, relation: Partial<RelationDefinition>): Promise<DataModel>;
removeRelation(modelId: string, relationId: string): Promise<DataModel>;
// 验证和生成
validateModel(definition: ModelDefinition): ValidationResult;
generateSchema(modelId: string, targetType: string): Promise<string>;
syncToDataSource(modelId: string, dataSourceId: string): Promise<SyncResult>;
// 事件处理
on(event: string, listener: Function): void;
off(event: string, listener: Function): void;
emit(event: string, data: any): void;
}
// 低代码数据模型管理器实现
class LowCodeDataModelManager implements DataModelManager {
private models: Map<string, DataModel> = new Map();
private eventBus: EventBus;
private dataSourceManager: DataSourceManager;
private validationEngine: ValidationEngine;
private schemaGenerators: Map<string, SchemaGenerator> = new Map();
private auditLogger: AuditLogger;
private options: DataModelManagerOptions;
constructor(options: DataModelManagerOptions) {
this.options = options;
this.eventBus = options.eventBus || new EventBus();
this.dataSourceManager = options.dataSourceManager;
this.validationEngine = new DefaultValidationEngine();
this.auditLogger = options.auditLogger;
this.initializeSchemaGenerators();
}
private initializeSchemaGenerators(): void {
this.schemaGenerators.set('mysql', new MySQLSchemaGenerator());
this.schemaGenerators.set('postgresql', new PostgreSQLSchemaGenerator());
this.schemaGenerators.set('mongodb', new MongoDBSchemaGenerator());
this.schemaGenerators.set('json', new JSONSchemaGenerator());
this.schemaGenerators.set('typescript', new TypeScriptSchemaGenerator());
}
async createModel(definition: ModelDefinition): Promise<DataModel> {
// 验证模型定义
const validation = this.validateModel(definition);
if (!validation.isValid) {
throw new Error(`模型定义无效: ${validation.errors.join(', ')}`);
}
// 创建数据模型
const model: DataModel = {
id: generateId(),
name: definition.name,
displayName: definition.displayName || definition.name,
description: definition.description,
fields: definition.fields.map(field => ({
...field,
id: field.id || generateId()
})),
relations: definition.relations?.map(relation => ({
...relation,
id: relation.id || generateId()
})) || [],
indexes: definition.indexes || [],
constraints: definition.constraints || [],
options: definition.options || {},
version: 1,
status: ModelStatus.DRAFT,
createdAt: new Date(),
updatedAt: new Date(),
metadata: definition.metadata || {}
};
this.models.set(model.id, model);
// 记录审计日志
await this.auditLogger.log({
action: 'model:create',
details: { modelId: model.id, name: model.name },
timestamp: new Date()
});
// 触发事件
this.emit('model:created', { model });
return model;
}
async updateModel(id: string, definition: Partial<ModelDefinition>): Promise<DataModel> {
const model = this.models.get(id);
if (!model) {
throw new Error(`数据模型不存在: ${id}`);
}
// 创建更新后的模型定义
const updatedDefinition: ModelDefinition = {
name: definition.name || model.name,
displayName: definition.displayName || model.displayName,
description: definition.description || model.description,
fields: definition.fields || model.fields,
relations: definition.relations || model.relations,
indexes: definition.indexes || model.indexes,
constraints: definition.constraints || model.constraints,
options: definition.options || model.options,
metadata: definition.metadata || model.metadata
};
// 验证更新后的模型
const validation = this.validateModel(updatedDefinition);
if (!validation.isValid) {
throw new Error(`模型定义无效: ${validation.errors.join(', ')}`);
}
// 更新模型
Object.assign(model, {
...updatedDefinition,
version: model.version + 1,
updatedAt: new Date()
});
// 记录审计日志
await this.auditLogger.log({
action: 'model:update',
details: { modelId: id, changes: Object.keys(definition) },
timestamp: new Date()
});
// 触发事件
this.emit('model:updated', { model, changes: definition });
return model;
}
async deleteModel(id: string): Promise<void> {
const model = this.models.get(id);
if (!model) {
throw new Error(`数据模型不存在: ${id}`);
}
// 检查是否有其他模型依赖此模型
const dependentModels = this.findDependentModels(id);
if (dependentModels.length > 0) {
throw new Error(`无法删除模型,存在依赖关系: ${dependentModels.map(m => m.name).join(', ')}`);
}
this.models.delete(id);
// 记录审计日志
await this.auditLogger.log({
action: 'model:delete',
details: { modelId: id, name: model.name },
timestamp: new Date()
});
// 触发事件
this.emit('model:deleted', { model });
}
async getModel(id: string): Promise<DataModel | null> {
return this.models.get(id) || null;
}
async queryModels(query: ModelQuery): Promise<DataModel[]> {
let results = Array.from(this.models.values());
// 应用过滤条件
if (query.status) {
results = results.filter(model => model.status === query.status);
}
if (query.search) {
const searchLower = query.search.toLowerCase();
results = results.filter(model =>
model.name.toLowerCase().includes(searchLower) ||
model.displayName.toLowerCase().includes(searchLower) ||
(model.description && model.description.toLowerCase().includes(searchLower))
);
}
if (query.tags && query.tags.length > 0) {
results = results.filter(model =>
query.tags!.some(tag => model.metadata.tags?.includes(tag))
);
}
// 排序
if (query.sortBy) {
results.sort((a, b) => {
const aValue = (a as any)[query.sortBy!];
const bValue = (b as any)[query.sortBy!];
if (query.sortOrder === 'desc') {
return bValue > aValue ? 1 : -1;
}
return aValue > bValue ? 1 : -1;
});
}
// 分页
const offset = query.offset || 0;
const limit = query.limit || 50;
return results.slice(offset, offset + limit);
}
async addField(modelId: string, field: FieldDefinition): Promise<DataModel> {
const model = this.models.get(modelId);
if (!model) {
throw new Error(`数据模型不存在: ${modelId}`);
}
// 检查字段名是否重复
if (model.fields.some(f => f.name === field.name)) {
throw new Error(`字段名已存在: ${field.name}`);
}
// 添加字段
const newField = {
...field,
id: field.id || generateId()
};
model.fields.push(newField);
model.version += 1;
model.updatedAt = new Date();
// 记录审计日志
await this.auditLogger.log({
action: 'model:field:add',
details: { modelId, fieldName: field.name, fieldType: field.type },
timestamp: new Date()
});
// 触发事件
this.emit('model:field:added', { model, field: newField });
return model;
}
async updateField(modelId: string, fieldId: string, field: Partial<FieldDefinition>): Promise<DataModel> {
const model = this.models.get(modelId);
if (!model) {
throw new Error(`数据模型不存在: ${modelId}`);
}
const fieldIndex = model.fields.findIndex(f => f.id === fieldId);
if (fieldIndex === -1) {
throw new Error(`字段不存在: ${fieldId}`);
}
// 更新字段
Object.assign(model.fields[fieldIndex], field);
model.version += 1;
model.updatedAt = new Date();
// 记录审计日志
await this.auditLogger.log({
action: 'model:field:update',
details: { modelId, fieldId, changes: Object.keys(field) },
timestamp: new Date()
});
// 触发事件
this.emit('model:field:updated', { model, field: model.fields[fieldIndex], changes: field });
return model;
}
async removeField(modelId: string, fieldId: string): Promise<DataModel> {
const model = this.models.get(modelId);
if (!model) {
throw new Error(`数据模型不存在: ${modelId}`);
}
const fieldIndex = model.fields.findIndex(f => f.id === fieldId);
if (fieldIndex === -1) {
throw new Error(`字段不存在: ${fieldId}`);
}
const removedField = model.fields[fieldIndex];
// 检查字段是否被关系或约束使用
const isUsedInRelations = model.relations.some(r =>
r.sourceField === removedField.name || r.targetField === removedField.name
);
const isUsedInConstraints = model.constraints.some(c =>
c.fields.includes(removedField.name)
);
if (isUsedInRelations || isUsedInConstraints) {
throw new Error(`字段正在被关系或约束使用,无法删除: ${removedField.name}`);
}
// 删除字段
model.fields.splice(fieldIndex, 1);
model.version += 1;
model.updatedAt = new Date();
// 记录审计日志
await this.auditLogger.log({
action: 'model:field:remove',
details: { modelId, fieldId, fieldName: removedField.name },
timestamp: new Date()
});
// 触发事件
this.emit('model:field:removed', { model, field: removedField });
return model;
}
async addRelation(modelId: string, relation: RelationDefinition): Promise<DataModel> {
const model = this.models.get(modelId);
if (!model) {
throw new Error(`数据模型不存在: ${modelId}`);
}
// 验证关系定义
this.validateRelation(model, relation);
// 添加关系
const newRelation = {
...relation,
id: relation.id || generateId()
};
model.relations.push(newRelation);
model.version += 1;
model.updatedAt = new Date();
// 记录审计日志
await this.auditLogger.log({
action: 'model:relation:add',
details: { modelId, relationType: relation.type, targetModel: relation.targetModel },
timestamp: new Date()
});
// 触发事件
this.emit('model:relation:added', { model, relation: newRelation });
return model;
}
async updateRelation(modelId: string, relationId: string, relation: Partial<RelationDefinition>): Promise<DataModel> {
const model = this.models.get(modelId);
if (!model) {
throw new Error(`数据模型不存在: ${modelId}`);
}
const relationIndex = model.relations.findIndex(r => r.id === relationId);
if (relationIndex === -1) {
throw new Error(`关系不存在: ${relationId}`);
}
// 更新关系
Object.assign(model.relations[relationIndex], relation);
model.version += 1;
model.updatedAt = new Date();
// 记录审计日志
await this.auditLogger.log({
action: 'model:relation:update',
details: { modelId, relationId, changes: Object.keys(relation) },
timestamp: new Date()
});
// 触发事件
this.emit('model:relation:updated', { model, relation: model.relations[relationIndex], changes: relation });
return model;
}
async removeRelation(modelId: string, relationId: string): Promise<DataModel> {
const model = this.models.get(modelId);
if (!model) {
throw new Error(`数据模型不存在: ${modelId}`);
}
const relationIndex = model.relations.findIndex(r => r.id === relationId);
if (relationIndex === -1) {
throw new Error(`关系不存在: ${relationId}`);
}
const removedRelation = model.relations[relationIndex];
// 删除关系
model.relations.splice(relationIndex, 1);
model.version += 1;
model.updatedAt = new Date();
// 记录审计日志
await this.auditLogger.log({
action: 'model:relation:remove',
details: { modelId, relationId, targetModel: removedRelation.targetModel },
timestamp: new Date()
});
// 触发事件
this.emit('model:relation:removed', { model, relation: removedRelation });
return model;
}
validateModel(definition: ModelDefinition): ValidationResult {
return this.validationEngine.validateModel(definition);
}
async generateSchema(modelId: string, targetType: string): Promise<string> {
const model = this.models.get(modelId);
if (!model) {
throw new Error(`数据模型不存在: ${modelId}`);
}
const generator = this.schemaGenerators.get(targetType);
if (!generator) {
throw new Error(`不支持的模式类型: ${targetType}`);
}
return generator.generate(model);
}
async syncToDataSource(modelId: string, dataSourceId: string): Promise<SyncResult> {
const model = this.models.get(modelId);
if (!model) {
throw new Error(`数据模型不存在: ${modelId}`);
}
const dataSource = await this.dataSourceManager.getDataSource(dataSourceId);
if (!dataSource) {
throw new Error(`数据源不存在: ${dataSourceId}`);
}
try {
// 生成对应数据源类型的模式
const schema = await this.generateSchema(modelId, dataSource.type);
// 执行模式同步
const result = await this.dataSourceManager.executeCommand(dataSourceId, {
type: CommandType.DDL,
sql: schema
});
if (result.success) {
// 更新模型状态
model.status = ModelStatus.PUBLISHED;
model.updatedAt = new Date();
// 记录审计日志
await this.auditLogger.log({
action: 'model:sync',
details: { modelId, dataSourceId, success: true },
timestamp: new Date()
});
// 触发事件
this.emit('model:synced', { model, dataSource, schema });
return {
success: true,
schema,
executionTime: result.executionTime
};
} else {
return {
success: false,
error: result.message,
executionTime: result.executionTime
};
}
} catch (error) {
// 记录错误日志
await this.auditLogger.log({
action: 'model:sync:error',
details: { modelId, dataSourceId, error: error.message },
timestamp: new Date()
});
return {
success: false,
error: error.message
};
}
}
// 事件处理
on(event: string, listener: Function): void {
this.eventBus.on(event, listener);
}
off(event: string, listener: Function): void {
this.eventBus.off(event, listener);
}
emit(event: string, data: any): void {
this.eventBus.emit(event, data);
}
// 私有方法
private findDependentModels(modelId: string): DataModel[] {
const dependentModels: DataModel[] = [];
for (const model of this.models.values()) {
if (model.id === modelId) continue;
const hasDependency = model.relations.some(relation =>
relation.targetModel === modelId
);
if (hasDependency) {
dependentModels.push(model);
}
}
return dependentModels;
}
private validateRelation(model: DataModel, relation: RelationDefinition): void {
// 检查源字段是否存在
if (!model.fields.some(f => f.name === relation.sourceField)) {
throw new Error(`源字段不存在: ${relation.sourceField}`);
}
// 检查目标模型是否存在
if (!this.models.has(relation.targetModel)) {
throw new Error(`目标模型不存在: ${relation.targetModel}`);
}
// 检查目标字段是否存在
const targetModel = this.models.get(relation.targetModel)!;
if (!targetModel.fields.some(f => f.name === relation.targetField)) {
throw new Error(`目标字段不存在: ${relation.targetField}`);
}
}
}