工作流引擎是低代码平台的核心组件之一,它负责管理和执行复杂的业务流程。本章将详细介绍工作流引擎的设计与实现,包括流程定义、节点管理、流程执行、状态管理等核心功能。

8.1 工作流引擎架构概览

8.1.1 整体架构设计

// 工作流引擎接口
interface WorkflowEngine {
  // 流程管理
  createWorkflow(definition: WorkflowDefinition): Promise<Workflow>;
  getWorkflow(id: string): Promise<Workflow | null>;
  updateWorkflow(id: string, definition: WorkflowDefinition): Promise<void>;
  deleteWorkflow(id: string): Promise<void>;
  
  // 流程实例管理
  startWorkflow(workflowId: string, context: WorkflowContext): Promise<WorkflowInstance>;
  getWorkflowInstance(instanceId: string): Promise<WorkflowInstance | null>;
  pauseWorkflowInstance(instanceId: string): Promise<void>;
  resumeWorkflowInstance(instanceId: string): Promise<void>;
  terminateWorkflowInstance(instanceId: string): Promise<void>;
  
  // 任务管理
  completeTask(taskId: string, result: TaskResult): Promise<void>;
  assignTask(taskId: string, assignee: string): Promise<void>;
  
  // 事件处理
  on(event: string, handler: Function): void;
  off(event: string, handler: Function): void;
  emit(event: string, data: any): void;
}

// 低代码工作流引擎实现
class LowCodeWorkflowEngine implements WorkflowEngine {
  private workflows: Map<string, Workflow> = new Map();
  private instances: Map<string, WorkflowInstance> = new Map();
  private tasks: Map<string, Task> = new Map();
  private nodeRegistry: NodeRegistry;
  private eventBus: EventBus;
  private executionEngine: ExecutionEngine;
  private stateManager: WorkflowStateManager;
  
  constructor(options: WorkflowEngineOptions) {
    this.nodeRegistry = options.nodeRegistry || new DefaultNodeRegistry();
    this.eventBus = options.eventBus || new EventBus();
    this.executionEngine = new ExecutionEngine(this.eventBus);
    this.stateManager = new WorkflowStateManager();
    
    this.initializeEngine();
  }
  
  private initializeEngine(): void {
    // 注册内置节点类型
    this.registerBuiltInNodes();
    
    // 绑定事件监听
    this.bindEvents();
  }
  
  async createWorkflow(definition: WorkflowDefinition): Promise<Workflow> {
    // 验证工作流定义
    this.validateWorkflowDefinition(definition);
    
    // 创建工作流实例
    const workflow = new Workflow({
      id: definition.id || this.generateId(),
      name: definition.name,
      description: definition.description,
      version: definition.version || '1.0.0',
      nodes: definition.nodes,
      edges: definition.edges,
      variables: definition.variables || {},
      settings: definition.settings || {}
    });
    
    // 保存工作流
    this.workflows.set(workflow.id, workflow);
    
    // 触发事件
    this.eventBus.emit('workflow:created', { workflow });
    
    return workflow;
  }
  
  async getWorkflow(id: string): Promise<Workflow | null> {
    return this.workflows.get(id) || null;
  }
  
  async updateWorkflow(id: string, definition: WorkflowDefinition): Promise<void> {
    const workflow = this.workflows.get(id);
    if (!workflow) {
      throw new Error(`Workflow with id ${id} not found`);
    }
    
    // 验证工作流定义
    this.validateWorkflowDefinition(definition);
    
    // 更新工作流
    workflow.update(definition);
    
    // 触发事件
    this.eventBus.emit('workflow:updated', { workflow });
  }
  
  async deleteWorkflow(id: string): Promise<void> {
    const workflow = this.workflows.get(id);
    if (!workflow) {
      throw new Error(`Workflow with id ${id} not found`);
    }
    
    // 检查是否有运行中的实例
    const runningInstances = Array.from(this.instances.values())
      .filter(instance => instance.workflowId === id && instance.status === WorkflowStatus.RUNNING);
    
    if (runningInstances.length > 0) {
      throw new Error('Cannot delete workflow with running instances');
    }
    
    // 删除工作流
    this.workflows.delete(id);
    
    // 触发事件
    this.eventBus.emit('workflow:deleted', { workflowId: id });
  }
  
  async startWorkflow(workflowId: string, context: WorkflowContext): Promise<WorkflowInstance> {
    const workflow = this.workflows.get(workflowId);
    if (!workflow) {
      throw new Error(`Workflow with id ${workflowId} not found`);
    }
    
    // 创建工作流实例
    const instance = new WorkflowInstance({
      id: this.generateId(),
      workflowId: workflowId,
      status: WorkflowStatus.RUNNING,
      context: context,
      startTime: new Date(),
      currentNodes: [],
      completedNodes: [],
      variables: { ...workflow.variables, ...context.variables }
    });
    
    // 保存实例
    this.instances.set(instance.id, instance);
    
    // 开始执行
    await this.executionEngine.startExecution(workflow, instance);
    
    // 触发事件
    this.eventBus.emit('workflow:started', { workflow, instance });
    
    return instance;
  }
  
  async getWorkflowInstance(instanceId: string): Promise<WorkflowInstance | null> {
    return this.instances.get(instanceId) || null;
  }
  
  async pauseWorkflowInstance(instanceId: string): Promise<void> {
    const instance = this.instances.get(instanceId);
    if (!instance) {
      throw new Error(`Workflow instance with id ${instanceId} not found`);
    }
    
    if (instance.status !== WorkflowStatus.RUNNING) {
      throw new Error('Can only pause running workflow instances');
    }
    
    // 暂停实例
    instance.status = WorkflowStatus.PAUSED;
    instance.pauseTime = new Date();
    
    // 暂停执行引擎
    await this.executionEngine.pauseExecution(instanceId);
    
    // 触发事件
    this.eventBus.emit('workflow:paused', { instance });
  }
  
  async resumeWorkflowInstance(instanceId: string): Promise<void> {
    const instance = this.instances.get(instanceId);
    if (!instance) {
      throw new Error(`Workflow instance with id ${instanceId} not found`);
    }
    
    if (instance.status !== WorkflowStatus.PAUSED) {
      throw new Error('Can only resume paused workflow instances');
    }
    
    // 恢复实例
    instance.status = WorkflowStatus.RUNNING;
    instance.resumeTime = new Date();
    
    // 恢复执行引擎
    await this.executionEngine.resumeExecution(instanceId);
    
    // 触发事件
    this.eventBus.emit('workflow:resumed', { instance });
  }
  
  async terminateWorkflowInstance(instanceId: string): Promise<void> {
    const instance = this.instances.get(instanceId);
    if (!instance) {
      throw new Error(`Workflow instance with id ${instanceId} not found`);
    }
    
    // 终止实例
    instance.status = WorkflowStatus.TERMINATED;
    instance.endTime = new Date();
    
    // 终止执行引擎
    await this.executionEngine.terminateExecution(instanceId);
    
    // 触发事件
    this.eventBus.emit('workflow:terminated', { instance });
  }
  
  async completeTask(taskId: string, result: TaskResult): Promise<void> {
    const task = this.tasks.get(taskId);
    if (!task) {
      throw new Error(`Task with id ${taskId} not found`);
    }
    
    // 完成任务
    task.status = TaskStatus.COMPLETED;
    task.result = result;
    task.completedTime = new Date();
    
    // 继续执行工作流
    await this.executionEngine.continueExecution(task.instanceId, task.nodeId, result);
    
    // 触发事件
    this.eventBus.emit('task:completed', { task, result });
  }
  
  async assignTask(taskId: string, assignee: string): Promise<void> {
    const task = this.tasks.get(taskId);
    if (!task) {
      throw new Error(`Task with id ${taskId} not found`);
    }
    
    // 分配任务
    task.assignee = assignee;
    task.assignedTime = new Date();
    
    // 触发事件
    this.eventBus.emit('task:assigned', { task, assignee });
  }
  
  on(event: string, handler: Function): void {
    this.eventBus.on(event, handler);
  }
  
  off(event: string, handler: Function): void {
    this.eventBus.off(event, handler);
  }
  
  emit(event: string, data: any): void {
    this.eventBus.emit(event, data);
  }
  
  private validateWorkflowDefinition(definition: WorkflowDefinition): void {
    if (!definition.name) {
      throw new Error('Workflow name is required');
    }
    
    if (!definition.nodes || definition.nodes.length === 0) {
      throw new Error('Workflow must have at least one node');
    }
    
    // 验证开始节点
    const startNodes = definition.nodes.filter(node => node.type === NodeType.START);
    if (startNodes.length !== 1) {
      throw new Error('Workflow must have exactly one start node');
    }
    
    // 验证结束节点
    const endNodes = definition.nodes.filter(node => node.type === NodeType.END);
    if (endNodes.length === 0) {
      throw new Error('Workflow must have at least one end node');
    }
    
    // 验证节点连接
    this.validateNodeConnections(definition.nodes, definition.edges);
  }
  
  private validateNodeConnections(nodes: WorkflowNode[], edges: WorkflowEdge[]): void {
    const nodeIds = new Set(nodes.map(node => node.id));
    
    for (const edge of edges) {
      if (!nodeIds.has(edge.sourceId)) {
        throw new Error(`Source node ${edge.sourceId} not found`);
      }
      
      if (!nodeIds.has(edge.targetId)) {
        throw new Error(`Target node ${edge.targetId} not found`);
      }
    }
  }
  
  private registerBuiltInNodes(): void {
    // 注册开始节点
    this.nodeRegistry.register(NodeType.START, StartNode);
    
    // 注册结束节点
    this.nodeRegistry.register(NodeType.END, EndNode);
    
    // 注册用户任务节点
    this.nodeRegistry.register(NodeType.USER_TASK, UserTaskNode);
    
    // 注册服务任务节点
    this.nodeRegistry.register(NodeType.SERVICE_TASK, ServiceTaskNode);
    
    // 注册网关节点
    this.nodeRegistry.register(NodeType.EXCLUSIVE_GATEWAY, ExclusiveGatewayNode);
    this.nodeRegistry.register(NodeType.PARALLEL_GATEWAY, ParallelGatewayNode);
    
    // 注册脚本任务节点
    this.nodeRegistry.register(NodeType.SCRIPT_TASK, ScriptTaskNode);
  }
  
  private bindEvents(): void {
    // 监听节点执行完成
    this.eventBus.on('node:completed', async (event) => {
      await this.handleNodeCompleted(event.instance, event.node, event.result);
    });
    
    // 监听节点执行失败
    this.eventBus.on('node:failed', async (event) => {
      await this.handleNodeFailed(event.instance, event.node, event.error);
    });
  }
  
  private async handleNodeCompleted(instance: WorkflowInstance, node: WorkflowNode, result: any): Promise<void> {
    // 更新实例状态
    instance.completedNodes.push(node.id);
    instance.currentNodes = instance.currentNodes.filter(id => id !== node.id);
    
    // 检查是否完成
    if (node.type === NodeType.END) {
      instance.status = WorkflowStatus.COMPLETED;
      instance.endTime = new Date();
      this.eventBus.emit('workflow:completed', { instance });
    }
  }
  
  private async handleNodeFailed(instance: WorkflowInstance, node: WorkflowNode, error: Error): Promise<void> {
    // 更新实例状态
    instance.status = WorkflowStatus.FAILED;
    instance.endTime = new Date();
    instance.error = error.message;
    
    // 触发事件
    this.eventBus.emit('workflow:failed', { instance, error });
  }
  
  private generateId(): string {
    return 'wf_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9);
  }
}

// 工作流引擎配置
interface WorkflowEngineOptions {
  nodeRegistry?: NodeRegistry;
  eventBus?: EventBus;
  persistence?: WorkflowPersistence;
  scheduler?: WorkflowScheduler;
}

8.2 核心数据结构

8.2.1 工作流定义

// 工作流定义
interface WorkflowDefinition {
  id?: string;
  name: string;
  description?: string;
  version?: string;
  nodes: WorkflowNode[];
  edges: WorkflowEdge[];
  variables?: Record<string, any>;
  settings?: WorkflowSettings;
}

// 工作流设置
interface WorkflowSettings {
  timeout?: number;
  retryPolicy?: RetryPolicy;
  errorHandling?: ErrorHandlingPolicy;
  notifications?: NotificationSettings;
}

// 重试策略
interface RetryPolicy {
  maxRetries: number;
  retryDelay: number;
  backoffMultiplier?: number;
}

// 错误处理策略
interface ErrorHandlingPolicy {
  onError: 'terminate' | 'continue' | 'retry';
  errorHandler?: string; // 错误处理器节点ID
}

// 通知设置
interface NotificationSettings {
  onStart?: NotificationConfig[];
  onComplete?: NotificationConfig[];
  onError?: NotificationConfig[];
}

// 通知配置
interface NotificationConfig {
  type: 'email' | 'sms' | 'webhook';
  recipients: string[];
  template?: string;
}

// 工作流类
class Workflow {
  public readonly id: string;
  public name: string;
  public description?: string;
  public version: string;
  public nodes: WorkflowNode[];
  public edges: WorkflowEdge[];
  public variables: Record<string, any>;
  public settings: WorkflowSettings;
  public createdTime: Date;
  public updatedTime: Date;
  
  constructor(options: WorkflowOptions) {
    this.id = options.id;
    this.name = options.name;
    this.description = options.description;
    this.version = options.version;
    this.nodes = options.nodes;
    this.edges = options.edges;
    this.variables = options.variables;
    this.settings = options.settings;
    this.createdTime = new Date();
    this.updatedTime = new Date();
  }
  
  update(definition: WorkflowDefinition): void {
    this.name = definition.name;
    this.description = definition.description;
    this.version = definition.version || this.version;
    this.nodes = definition.nodes;
    this.edges = definition.edges;
    this.variables = definition.variables || {};
    this.settings = definition.settings || {};
    this.updatedTime = new Date();
  }
  
  getNode(nodeId: string): WorkflowNode | undefined {
    return this.nodes.find(node => node.id === nodeId);
  }
  
  getOutgoingEdges(nodeId: string): WorkflowEdge[] {
    return this.edges.filter(edge => edge.sourceId === nodeId);
  }
  
  getIncomingEdges(nodeId: string): WorkflowEdge[] {
    return this.edges.filter(edge => edge.targetId === nodeId);
  }
  
  getStartNode(): WorkflowNode | undefined {
    return this.nodes.find(node => node.type === NodeType.START);
  }
  
  getEndNodes(): WorkflowNode[] {
    return this.nodes.filter(node => node.type === NodeType.END);
  }
  
  validate(): ValidationResult {
    const errors: string[] = [];
    
    // 验证开始节点
    const startNodes = this.nodes.filter(node => node.type === NodeType.START);
    if (startNodes.length !== 1) {
      errors.push('Workflow must have exactly one start node');
    }
    
    // 验证结束节点
    const endNodes = this.nodes.filter(node => node.type === NodeType.END);
    if (endNodes.length === 0) {
      errors.push('Workflow must have at least one end node');
    }
    
    // 验证节点连接
    for (const node of this.nodes) {
      if (node.type !== NodeType.END) {
        const outgoingEdges = this.getOutgoingEdges(node.id);
        if (outgoingEdges.length === 0) {
          errors.push(`Node ${node.id} has no outgoing connections`);
        }
      }
    }
    
    return {
      valid: errors.length === 0,
      errors
    };
  }
}

// 工作流选项
interface WorkflowOptions {
  id: string;
  name: string;
  description?: string;
  version: string;
  nodes: WorkflowNode[];
  edges: WorkflowEdge[];
  variables: Record<string, any>;
  settings: WorkflowSettings;
}

// 验证结果
interface ValidationResult {
  valid: boolean;
  errors: string[];
}

8.2.2 工作流节点

// 节点类型
enum NodeType {
  START = 'start',
  END = 'end',
  USER_TASK = 'userTask',
  SERVICE_TASK = 'serviceTask',
  SCRIPT_TASK = 'scriptTask',
  EXCLUSIVE_GATEWAY = 'exclusiveGateway',
  PARALLEL_GATEWAY = 'parallelGateway',
  TIMER = 'timer',
  MESSAGE = 'message',
  SIGNAL = 'signal'
}

// 工作流节点
interface WorkflowNode {
  id: string;
  type: NodeType;
  name: string;
  description?: string;
  properties: Record<string, any>;
  position: NodePosition;
  style?: NodeStyle;
}

// 节点位置
interface NodePosition {
  x: number;
  y: number;
}

// 节点样式
interface NodeStyle {
  width?: number;
  height?: number;
  backgroundColor?: string;
  borderColor?: string;
  borderWidth?: number;
  borderRadius?: number;
  fontSize?: number;
  fontColor?: string;
}

// 工作流边
interface WorkflowEdge {
  id: string;
  sourceId: string;
  targetId: string;
  condition?: string;
  label?: string;
  style?: EdgeStyle;
}

// 边样式
interface EdgeStyle {
  strokeColor?: string;
  strokeWidth?: number;
  strokeDasharray?: string;
  markerEnd?: string;
}

// 抽象节点基类
abstract class BaseNode {
  protected id: string;
  protected type: NodeType;
  protected name: string;
  protected properties: Record<string, any>;
  
  constructor(node: WorkflowNode) {
    this.id = node.id;
    this.type = node.type;
    this.name = node.name;
    this.properties = node.properties;
  }
  
  abstract async execute(context: ExecutionContext): Promise<ExecutionResult>;
  
  protected getProperty(key: string, defaultValue?: any): any {
    return this.properties[key] ?? defaultValue;
  }
  
  protected setProperty(key: string, value: any): void {
    this.properties[key] = value;
  }
}

// 开始节点
class StartNode extends BaseNode {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    return {
      success: true,
      data: context.variables,
      nextNodes: this.getOutgoingNodes(context)
    };
  }
  
  private getOutgoingNodes(context: ExecutionContext): string[] {
    const workflow = context.workflow;
    const outgoingEdges = workflow.getOutgoingEdges(this.id);
    return outgoingEdges.map(edge => edge.targetId);
  }
}

// 脚本任务节点
class ScriptTaskNode extends BaseNode {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    try {
      const scriptType = this.getProperty('scriptType', 'javascript');
      const script = this.getProperty('script');
      const resultVariable = this.getProperty('resultVariable');
      
      if (!script) {
        throw new Error('Script is required for script task');
      }
      
      // 执行脚本
      const result = await this.executeScript(scriptType, script, context.variables);
      
      // 更新变量
      if (resultVariable) {
        context.variables[resultVariable] = result;
      }
      
      return {
        success: true,
        data: result,
        nextNodes: this.getOutgoingNodes(context)
      };
    } catch (error) {
      return {
        success: false,
        error: error.message
      };
    }
  }
  
  private async executeScript(scriptType: string, script: string, variables: Record<string, any>): Promise<any> {
    switch (scriptType) {
      case 'javascript':
        return this.executeJavaScript(script, variables);
      case 'python':
        return this.executePython(script, variables);
      default:
        throw new Error(`Unsupported script type: ${scriptType}`);
    }
  }
  
  private async executeJavaScript(script: string, variables: Record<string, any>): Promise<any> {
    // 创建安全的执行环境
    const context = {
      variables,
      console: {
        log: (...args: any[]) => console.log('[Script]', ...args)
      },
      Math,
      Date,
      JSON
    };
    
    // 包装脚本
    const wrappedScript = `
      (function() {
        ${script}
      })()
    `;
    
    try {
      // 使用 Function 构造器执行脚本
      const func = new Function(...Object.keys(context), `return ${wrappedScript}`);
      return func(...Object.values(context));
    } catch (error) {
      throw new Error(`Script execution failed: ${error.message}`);
    }
  }
  
  private async executePython(script: string, variables: Record<string, any>): Promise<any> {
    // 这里需要集成 Python 执行环境
    // 可以使用 Pyodide 或者服务端 Python 执行器
    throw new Error('Python script execution not implemented');
  }
  
  private getOutgoingNodes(context: ExecutionContext): string[] {
    const workflow = context.workflow;
    const outgoingEdges = workflow.getOutgoingEdges(this.id);
    return outgoingEdges.map(edge => edge.targetId);
  }
}

// 排他网关节点
class ExclusiveGatewayNode extends BaseNode {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    try {
      const nextNodes = await this.evaluateConditions(context);
      
      if (nextNodes.length === 0) {
        // 没有满足条件的路径,使用默认路径
        const defaultPath = this.getDefaultPath(context);
        if (defaultPath) {
          nextNodes.push(defaultPath);
        } else {
          throw new Error('No condition matched and no default path defined');
        }
      }
      
      return {
        success: true,
        data: context.variables,
        nextNodes
      };
    } catch (error) {
      return {
        success: false,
        error: error.message
      };
    }
  }
  
  private async evaluateConditions(context: ExecutionContext): Promise<string[]> {
    const workflow = context.workflow;
    const outgoingEdges = workflow.getOutgoingEdges(this.id);
    
    for (const edge of outgoingEdges) {
      if (edge.condition) {
        const conditionResult = await this.evaluateCondition(edge.condition, context.variables);
        if (conditionResult) {
          return [edge.targetId];
        }
      }
    }
    
    return [];
  }
  
  private async evaluateCondition(condition: string, variables: Record<string, any>): Promise<boolean> {
    try {
      // 创建安全的条件评估环境
      const context = { variables };
      
      // 包装条件表达式
      const wrappedCondition = `
        (function() {
          with (variables) {
            return ${condition};
          }
        })()
      `;
      
      const func = new Function('variables', `return ${wrappedCondition}`);
      return Boolean(func(variables));
    } catch (error) {
      console.warn(`Condition evaluation failed: ${error.message}`);
      return false;
    }
  }
  
  private getDefaultPath(context: ExecutionContext): string | null {
    const workflow = context.workflow;
    const outgoingEdges = workflow.getOutgoingEdges(this.id);
    
    // 查找没有条件的边作为默认路径
    const defaultEdge = outgoingEdges.find(edge => !edge.condition);
    return defaultEdge ? defaultEdge.targetId : null;
  }
}

// 并行网关节点
class ParallelGatewayNode extends BaseNode {
  async execute(context: ExecutionContext): Promise<ExecutionResult> {
    const gatewayType = this.getProperty('gatewayType', 'fork'); // fork 或 join
    
    if (gatewayType === 'fork') {
      return this.executeFork(context);
    } else {
      return this.executeJoin(context);
    }
  }
  
  private async executeFork(context: ExecutionContext): Promise<ExecutionResult> {
    // 并行分支:激活所有出口路径
    const nextNodes = this.getAllOutgoingNodes(context);
    
    return {
      success: true,
      data: context.variables,
      nextNodes
    };
  }
  
  private async executeJoin(context: ExecutionContext): Promise<ExecutionResult> {
    // 并行汇聚:等待所有入口路径完成
    const incomingNodes = this.getAllIncomingNodes(context);
    const completedNodes = context.instance.completedNodes;
    
    // 检查是否所有入口节点都已完成
    const allCompleted = incomingNodes.every(nodeId => completedNodes.includes(nodeId));
    
    if (allCompleted) {
      const nextNodes = this.getAllOutgoingNodes(context);
      return {
        success: true,
        data: context.variables,
        nextNodes
      };
    } else {
      // 等待其他分支完成
      return {
        success: true,
        data: context.variables,
        waiting: true
      };
    }
  }
  
  private getAllOutgoingNodes(context: ExecutionContext): string[] {
    const workflow = context.workflow;
    const outgoingEdges = workflow.getOutgoingEdges(this.id);
    return outgoingEdges.map(edge => edge.targetId);
  }
  
  private getAllIncomingNodes(context: ExecutionContext): string[] {
    const workflow = context.workflow;
    const incomingEdges = workflow.getIncomingEdges(this.id);
    return incomingEdges.map(edge => edge.sourceId);
  }
}

// 执行上下文
interface ExecutionContext {
  workflow: Workflow;
  instance: WorkflowInstance;
  variables: Record<string, any>;
  engine: WorkflowEngine;
}

// 执行结果
interface ExecutionResult {
  success: boolean;
  data?: any;
  error?: string;
  nextNodes?: string[];
  waiting?: boolean;
  completed?: boolean;
  taskId?: string;
}

8.3 工作流实例管理

8.3.1 工作流实例

// 工作流状态
enum WorkflowStatus {
  CREATED = 'created',
  RUNNING = 'running',
  PAUSED = 'paused',
  COMPLETED = 'completed',
  FAILED = 'failed',
  TERMINATED = 'terminated'
}

// 工作流实例
class WorkflowInstance {
  public readonly id: string;
  public readonly workflowId: string;
  public status: WorkflowStatus;
  public context: WorkflowContext;
  public variables: Record<string, any>;
  public currentNodes: string[];
  public completedNodes: string[];
  public startTime: Date;
  public endTime?: Date;
  public pauseTime?: Date;
  public resumeTime?: Date;
  public error?: string;
  public metadata: Record<string, any>;
  
  constructor(options: WorkflowInstanceOptions) {
    this.id = options.id;
    this.workflowId = options.workflowId;
    this.status = options.status;
    this.context = options.context;
    this.variables = options.variables;
    this.currentNodes = options.currentNodes;
    this.completedNodes = options.completedNodes;
    this.startTime = options.startTime;
    this.endTime = options.endTime;
    this.pauseTime = options.pauseTime;
    this.resumeTime = options.resumeTime;
    this.error = options.error;
    this.metadata = options.metadata || {};
  }
  
  isRunning(): boolean {
    return this.status === WorkflowStatus.RUNNING;
  }
  
  isPaused(): boolean {
    return this.status === WorkflowStatus.PAUSED;
  }
  
  isCompleted(): boolean {
    return this.status === WorkflowStatus.COMPLETED;
  }
  
  isFailed(): boolean {
    return this.status === WorkflowStatus.FAILED;
  }
  
  isTerminated(): boolean {
    return this.status === WorkflowStatus.TERMINATED;
  }
  
  getDuration(): number {
    if (!this.endTime) {
      return Date.now() - this.startTime.getTime();
    }
    return this.endTime.getTime() - this.startTime.getTime();
  }
  
  getVariable(name: string): any {
    return this.variables[name];
  }
  
  setVariable(name: string, value: any): void {
    this.variables[name] = value;
  }
  
  addCurrentNode(nodeId: string): void {
    if (!this.currentNodes.includes(nodeId)) {
      this.currentNodes.push(nodeId);
    }
  }
  
  removeCurrentNode(nodeId: string): void {
    this.currentNodes = this.currentNodes.filter(id => id !== nodeId);
  }
  
  addCompletedNode(nodeId: string): void {
    if (!this.completedNodes.includes(nodeId)) {
      this.completedNodes.push(nodeId);
    }
  }
  
  toJSON(): any {
    return {
      id: this.id,
      workflowId: this.workflowId,
      status: this.status,
      context: this.context,
      variables: this.variables,
      currentNodes: this.currentNodes,
      completedNodes: this.completedNodes,
      startTime: this.startTime,
      endTime: this.endTime,
      pauseTime: this.pauseTime,
      resumeTime: this.resumeTime,
      error: this.error,
      metadata: this.metadata
    };
  }
}

// 工作流实例选项
interface WorkflowInstanceOptions {
  id: string;
  workflowId: string;
  status: WorkflowStatus;
  context: WorkflowContext;
  variables: Record<string, any>;
  currentNodes: string[];
  completedNodes: string[];
  startTime: Date;
  endTime?: Date;
  pauseTime?: Date;
  resumeTime?: Date;
  error?: string;
  metadata?: Record<string, any>;
}

// 工作流上下文
interface WorkflowContext {
  initiator: string;
  businessKey?: string;
  variables: Record<string, any>;
  metadata?: Record<string, any>;
}

8.3.2 任务管理

// 任务类型
enum TaskType {
  USER_TASK = 'userTask',
  SERVICE_TASK = 'serviceTask',
  SCRIPT_TASK = 'scriptTask',
  MANUAL_TASK = 'manualTask'
}

// 任务状态
enum TaskStatus {
  CREATED = 'created',
  ASSIGNED = 'assigned',
  IN_PROGRESS = 'inProgress',
  COMPLETED = 'completed',
  CANCELLED = 'cancelled',
  FAILED = 'failed'
}

// 任务优先级
enum TaskPriority {
  LOW = 'low',
  NORMAL = 'normal',
  HIGH = 'high',
  URGENT = 'urgent'
}

// 任务接口
interface Task {
  id: string;
  instanceId: string;
  nodeId: string;
  type: TaskType;
  name: string;
  description?: string;
  assignee?: string;
  candidateUsers: string[];
  candidateGroups: string[];
  dueDate?: Date;
  priority: TaskPriority;
  formKey?: string;
  variables: Record<string, any>;
  status: TaskStatus;
  result?: TaskResult;
  createdTime: Date;
  assignedTime?: Date;
  startTime?: Date;
  completedTime?: Date;
  metadata?: Record<string, any>;
}

// 任务结果
interface TaskResult {
  success: boolean;
  data?: any;
  error?: string;
  variables?: Record<string, any>;
}

// 任务管理器
class TaskManager {
  private tasks: Map<string, Task> = new Map();
  private userTasks: Map<string, string[]> = new Map(); // userId -> taskIds
  private groupTasks: Map<string, string[]> = new Map(); // groupId -> taskIds
  private eventBus: EventBus;
  
  constructor(eventBus: EventBus) {
    this.eventBus = eventBus;
  }
  
  createTask(task: Task): void {
    this.tasks.set(task.id, task);
    
    // 更新用户任务索引
    if (task.assignee) {
      this.addUserTask(task.assignee, task.id);
    }
    
    // 更新候选用户任务索引
    for (const userId of task.candidateUsers) {
      this.addUserTask(userId, task.id);
    }
    
    // 更新候选组任务索引
    for (const groupId of task.candidateGroups) {
      this.addGroupTask(groupId, task.id);
    }
    
    // 触发事件
    this.eventBus.emit('task:created', { task });
  }
  
  getTask(taskId: string): Task | undefined {
    return this.tasks.get(taskId);
  }
  
  getUserTasks(userId: string): Task[] {
    const taskIds = this.userTasks.get(userId) || [];
    return taskIds.map(id => this.tasks.get(id)).filter(task => task !== undefined) as Task[];
  }
  
  getGroupTasks(groupId: string): Task[] {
    const taskIds = this.groupTasks.get(groupId) || [];
    return taskIds.map(id => this.tasks.get(id)).filter(task => task !== undefined) as Task[];
  }
  
  assignTask(taskId: string, assignee: string): void {
    const task = this.tasks.get(taskId);
    if (!task) {
      throw new Error(`Task with id ${taskId} not found`);
    }
    
    if (task.status !== TaskStatus.CREATED) {
      throw new Error('Can only assign created tasks');
    }
    
    // 更新任务
    task.assignee = assignee;
    task.status = TaskStatus.ASSIGNED;
    task.assignedTime = new Date();
    
    // 更新索引
    this.addUserTask(assignee, taskId);
    
    // 触发事件
    this.eventBus.emit('task:assigned', { task, assignee });
  }
  
  startTask(taskId: string, userId: string): void {
    const task = this.tasks.get(taskId);
    if (!task) {
      throw new Error(`Task with id ${taskId} not found`);
    }
    
    if (task.status !== TaskStatus.ASSIGNED && task.status !== TaskStatus.CREATED) {
      throw new Error('Can only start assigned or created tasks');
    }
    
    // 检查权限
    if (!this.canUserAccessTask(userId, task)) {
      throw new Error('User does not have permission to start this task');
    }
    
    // 更新任务
    task.status = TaskStatus.IN_PROGRESS;
    task.startTime = new Date();
    
    // 触发事件
    this.eventBus.emit('task:started', { task, userId });
  }
  
  completeTask(taskId: string, userId: string, result: TaskResult): void {
    const task = this.tasks.get(taskId);
    if (!task) {
      throw new Error(`Task with id ${taskId} not found`);
    }
    
    if (task.status !== TaskStatus.IN_PROGRESS) {
      throw new Error('Can only complete in-progress tasks');
    }
    
    // 检查权限
    if (!this.canUserAccessTask(userId, task)) {
      throw new Error('User does not have permission to complete this task');
    }
    
    // 更新任务
    task.status = TaskStatus.COMPLETED;
    task.result = result;
    task.completedTime = new Date();
    
    // 触发事件
    this.eventBus.emit('task:completed', { task, userId, result });
  }
  
  cancelTask(taskId: string, reason?: string): void {
    const task = this.tasks.get(taskId);
    if (!task) {
      throw new Error(`Task with id ${taskId} not found`);
    }
    
    if (task.status === TaskStatus.COMPLETED || task.status === TaskStatus.CANCELLED) {
      throw new Error('Cannot cancel completed or already cancelled tasks');
    }
    
    // 更新任务
    task.status = TaskStatus.CANCELLED;
    if (reason) {
      task.metadata = { ...task.metadata, cancelReason: reason };
    }
    
    // 触发事件
    this.eventBus.emit('task:cancelled', { task, reason });
  }
  
  queryTasks(query: TaskQuery): Task[] {
    let tasks = Array.from(this.tasks.values());
    
    // 按状态过滤
    if (query.status) {
      tasks = tasks.filter(task => task.status === query.status);
    }
    
    // 按分配人过滤
    if (query.assignee) {
      tasks = tasks.filter(task => task.assignee === query.assignee);
    }
    
    // 按候选用户过滤
    if (query.candidateUser) {
      tasks = tasks.filter(task => task.candidateUsers.includes(query.candidateUser!));
    }
    
    // 按候选组过滤
    if (query.candidateGroup) {
      tasks = tasks.filter(task => task.candidateGroups.includes(query.candidateGroup!));
    }
    
    // 按工作流实例过滤
    if (query.instanceId) {
      tasks = tasks.filter(task => task.instanceId === query.instanceId);
    }
    
    // 按到期时间过滤
    if (query.dueBefore) {
      tasks = tasks.filter(task => task.dueDate && task.dueDate <= query.dueBefore!);
    }
    
    if (query.dueAfter) {
      tasks = tasks.filter(task => task.dueDate && task.dueDate >= query.dueAfter!);
    }
    
    // 排序
    if (query.orderBy) {
      tasks.sort((a, b) => {
        const aValue = this.getTaskProperty(a, query.orderBy!);
        const bValue = this.getTaskProperty(b, query.orderBy!);
        
        if (query.orderDirection === 'desc') {
          return bValue > aValue ? 1 : -1;
        } else {
          return aValue > bValue ? 1 : -1;
        }
      });
    }
    
    // 分页
    if (query.offset !== undefined && query.limit !== undefined) {
      tasks = tasks.slice(query.offset, query.offset + query.limit);
    }
    
    return tasks;
  }
  
  private canUserAccessTask(userId: string, task: Task): boolean {
    // 检查是否是分配人
    if (task.assignee === userId) {
      return true;
    }
    
    // 检查是否是候选用户
    if (task.candidateUsers.includes(userId)) {
      return true;
    }
    
    // 这里可以添加更复杂的权限检查逻辑
    // 例如检查用户是否属于候选组
    
    return false;
  }
  
  private addUserTask(userId: string, taskId: string): void {
    if (!this.userTasks.has(userId)) {
      this.userTasks.set(userId, []);
    }
    
    const taskIds = this.userTasks.get(userId)!;
    if (!taskIds.includes(taskId)) {
      taskIds.push(taskId);
    }
  }
  
  private addGroupTask(groupId: string, taskId: string): void {
    if (!this.groupTasks.has(groupId)) {
      this.groupTasks.set(groupId, []);
    }
    
    const taskIds = this.groupTasks.get(groupId)!;
    if (!taskIds.includes(taskId)) {
      taskIds.push(taskId);
    }
  }
  
  private getTaskProperty(task: Task, property: string): any {
    switch (property) {
      case 'createdTime':
        return task.createdTime;
      case 'dueDate':
        return task.dueDate;
      case 'priority':
        return this.getPriorityValue(task.priority);
      default:
        return (task as any)[property];
    }
  }
  
  private getPriorityValue(priority: TaskPriority): number {
    switch (priority) {
      case TaskPriority.LOW:
        return 1;
      case TaskPriority.NORMAL:
        return 2;
      case TaskPriority.HIGH:
        return 3;
      case TaskPriority.URGENT:
        return 4;
      default:
        return 0;
    }
  }
}

// 任务查询
interface TaskQuery {
  status?: TaskStatus;
  assignee?: string;
  candidateUser?: string;
  candidateGroup?: string;
  instanceId?: string;
  dueBefore?: Date;
  dueAfter?: Date;
  orderBy?: string;
  orderDirection?: 'asc' | 'desc';
  offset?: number;
  limit?: number;
}

8.4 执行引擎

8.4.1 执行引擎实现

// 执行引擎
class ExecutionEngine {
  private runningInstances: Map<string, WorkflowInstance> = new Map();
  private nodeRegistry: NodeRegistry;
  private eventBus: EventBus;
  private taskManager: TaskManager;
  
  constructor(eventBus: EventBus) {
    this.eventBus = eventBus;
    this.nodeRegistry = new DefaultNodeRegistry();
    this.taskManager = new TaskManager(eventBus);
    
    this.bindEvents();
  }
  
  async startExecution(workflow: Workflow, instance: WorkflowInstance): Promise<void> {
    // 添加到运行实例
    this.runningInstances.set(instance.id, instance);
    
    // 获取开始节点
    const startNode = workflow.getStartNode();
    if (!startNode) {
      throw new Error('Workflow has no start node');
    }
    
    // 执行开始节点
    await this.executeNode(workflow, instance, startNode);
  }
  
  async pauseExecution(instanceId: string): Promise<void> {
    const instance = this.runningInstances.get(instanceId);
    if (!instance) {
      throw new Error(`Instance ${instanceId} not found`);
    }
    
    // 暂停实例(实际的暂停逻辑由调用方处理)
    // 这里主要是清理运行状态
    this.runningInstances.delete(instanceId);
  }
  
  async resumeExecution(instanceId: string): Promise<void> {
    // 恢复执行逻辑
    // 需要重新加载实例状态并继续执行
    console.log(`Resuming execution for instance ${instanceId}`);
  }
  
  async terminateExecution(instanceId: string): Promise<void> {
    const instance = this.runningInstances.get(instanceId);
    if (!instance) {
      return; // 实例不在运行中
    }
    
    // 清理运行状态
    this.runningInstances.delete(instanceId);
    
    // 取消相关任务
    const tasks = this.taskManager.queryTasks({ instanceId });
    for (const task of tasks) {
      if (task.status !== TaskStatus.COMPLETED && task.status !== TaskStatus.CANCELLED) {
        this.taskManager.cancelTask(task.id, 'Workflow terminated');
      }
    }
  }
  
  async continueExecution(instanceId: string, nodeId: string, result: any): Promise<void> {
    const instance = this.runningInstances.get(instanceId);
    if (!instance) {
      throw new Error(`Instance ${instanceId} not found`);
    }
    
    // 获取工作流定义
    const workflow = await this.getWorkflow(instance.workflowId);
    if (!workflow) {
      throw new Error(`Workflow ${instance.workflowId} not found`);
    }
    
    // 更新实例状态
    instance.removeCurrentNode(nodeId);
    instance.addCompletedNode(nodeId);
    
    // 获取下一个节点
    const nextNodes = this.getNextNodes(workflow, nodeId, result);
    
    // 执行下一个节点
    for (const nextNodeId of nextNodes) {
      const nextNode = workflow.getNode(nextNodeId);
      if (nextNode) {
        await this.executeNode(workflow, instance, nextNode);
      }
    }
  }
  
  private async executeNode(workflow: Workflow, instance: WorkflowInstance, node: WorkflowNode): Promise<void> {
    try {
      // 添加到当前节点
      instance.addCurrentNode(node.id);
      
      // 创建执行上下文
      const context: ExecutionContext = {
        workflow,
        instance,
        variables: instance.variables,
        engine: this as any // 类型转换,实际使用时需要正确的接口
      };
      
      // 获取节点执行器
      const nodeExecutor = this.nodeRegistry.getNode(node.type);
      if (!nodeExecutor) {
        throw new Error(`No executor found for node type: ${node.type}`);
      }
      
      // 创建节点实例
      const nodeInstance = new nodeExecutor(node);
      
      // 执行节点
      const result = await nodeInstance.execute(context);
      
      // 处理执行结果
      await this.handleExecutionResult(workflow, instance, node, result);
      
    } catch (error) {
      // 处理执行错误
      await this.handleExecutionError(workflow, instance, node, error as Error);
    }
  }
  
  private async handleExecutionResult(
    workflow: Workflow,
    instance: WorkflowInstance,
    node: WorkflowNode,
    result: ExecutionResult
  ): Promise<void> {
    if (result.success) {
      // 触发节点完成事件
      this.eventBus.emit('node:completed', { workflow, instance, node, result });
      
      if (result.completed) {
        // 工作流完成
        instance.status = WorkflowStatus.COMPLETED;
        instance.endTime = new Date();
        this.runningInstances.delete(instance.id);
        
        this.eventBus.emit('workflow:completed', { workflow, instance });
      } else if (result.waiting) {
        // 等待外部事件(如用户任务)
        if (result.taskId) {
          // 任务已创建,等待完成
        }
      } else if (result.nextNodes) {
        // 继续执行下一个节点
        for (const nextNodeId of result.nextNodes) {
          const nextNode = workflow.getNode(nextNodeId);
          if (nextNode) {
            await this.executeNode(workflow, instance, nextNode);
          }
        }
      }
    } else {
      // 执行失败
      throw new Error(result.error || 'Node execution failed');
    }
  }
  
  private async handleExecutionError(
    workflow: Workflow,
    instance: WorkflowInstance,
    node: WorkflowNode,
    error: Error
  ): Promise<void> {
    // 更新实例状态
    instance.status = WorkflowStatus.FAILED;
    instance.endTime = new Date();
    instance.error = error.message;
    
    // 清理运行状态
    this.runningInstances.delete(instance.id);
    
    // 触发错误事件
    this.eventBus.emit('node:failed', { workflow, instance, node, error });
    this.eventBus.emit('workflow:failed', { workflow, instance, error });
  }
  
  private getNextNodes(workflow: Workflow, nodeId: string, result: any): string[] {
    const outgoingEdges = workflow.getOutgoingEdges(nodeId);
    
    // 如果结果中指定了下一个节点,使用结果中的
    if (result && result.nextNodes) {
      return result.nextNodes;
    }
    
    // 否则返回所有出口节点
    return outgoingEdges.map(edge => edge.targetId);
  }
  
  private async getWorkflow(workflowId: string): Promise<Workflow | null> {
    // 这里应该从工作流存储中获取工作流定义
    // 简化实现,实际应该注入工作流存储服务
    return null;
  }
  
  private bindEvents(): void {
    // 监听任务完成事件
    this.eventBus.on('task:completed', async (event) => {
      const { task, result } = event;
      await this.continueExecution(task.instanceId, task.nodeId, result);
    });
  }
  
  addTask(task: Task): void {
    this.taskManager.createTask(task);
  }
  
  getTaskManager(): TaskManager {
    return this.taskManager;
  }
}

8.4.2 节点注册器

// 节点注册器接口
interface NodeRegistry {
  register(nodeType: NodeType, nodeClass: new (node: WorkflowNode) => BaseNode): void;
  getNode(nodeType: NodeType): (new (node: WorkflowNode) => BaseNode) | undefined;
  unregister(nodeType: NodeType): void;
  getRegisteredTypes(): NodeType[];
}

// 默认节点注册器
class DefaultNodeRegistry implements NodeRegistry {
  private nodes: Map<NodeType, new (node: WorkflowNode) => BaseNode> = new Map();
  
  register(nodeType: NodeType, nodeClass: new (node: WorkflowNode) => BaseNode): void {
    this.nodes.set(nodeType, nodeClass);
  }
  
  getNode(nodeType: NodeType): (new (node: WorkflowNode) => BaseNode) | undefined {
    return this.nodes.get(nodeType);
  }
  
  unregister(nodeType: NodeType): void {
    this.nodes.delete(nodeType);
  }
  
  getRegisteredTypes(): NodeType[] {
    return Array.from(this.nodes.keys());
  }
}

8.5 工作流引擎使用示例

8.5.1 完整使用示例

// 工作流引擎使用示例
class LowCodeWorkflowEngineDemo {
  private engine: LowCodeWorkflowEngine;
  
  constructor() {
    // 创建事件总线
    const eventBus = new EventBus();
    
    // 创建工作流引擎
    this.engine = new LowCodeWorkflowEngine({
      eventBus
    });
    
    this.setupEventListeners();
  }
  
  async demo(): Promise<void> {
    console.log('=== 低代码工作流引擎演示 ===');
    
    // 1. 创建请假审批工作流
    await this.createLeaveApprovalWorkflow();
    
    // 2. 启动工作流实例
    await this.startLeaveApprovalProcess();
    
    // 3. 处理用户任务
    await this.handleUserTasks();
    
    // 4. 查询工作流状态
    await this.queryWorkflowStatus();
  }
  
  private async createLeaveApprovalWorkflow(): Promise<void> {
    console.log('\n1. 创建请假审批工作流');
    
    const workflowDefinition: WorkflowDefinition = {
      id: 'leave_approval',
      name: '请假审批流程',
      description: '员工请假审批工作流',
      version: '1.0.0',
      nodes: [
        {
          id: 'start',
          type: NodeType.START,
          name: '开始',
          properties: {},
          position: { x: 100, y: 100 }
        },
        {
          id: 'submit_application',
          type: NodeType.USER_TASK,
          name: '提交请假申请',
          properties: {
            taskName: '填写请假申请',
            description: '请填写请假申请表单',
            formKey: 'leave_application_form',
            candidateGroups: ['employees']
          },
          position: { x: 300, y: 100 }
        },
        {
          id: 'manager_approval',
          type: NodeType.USER_TASK,
          name: '经理审批',
          properties: {
            taskName: '审批请假申请',
            description: '请审批员工的请假申请',
            assignee: '${applicant.manager}',
            dueDate: 'P1D' // 1天内完成
          },
          position: { x: 500, y: 100 }
        },
        {
          id: 'approval_gateway',
          type: NodeType.EXCLUSIVE_GATEWAY,
          name: '审批结果判断',
          properties: {},
          position: { x: 700, y: 100 }
        },
        {
          id: 'hr_approval',
          type: NodeType.USER_TASK,
          name: 'HR审批',
          properties: {
            taskName: 'HR最终审批',
            description: '请进行最终审批',
            candidateGroups: ['hr']
          },
          position: { x: 700, y: 300 }
        },
        {
          id: 'send_notification',
          type: NodeType.SERVICE_TASK,
          name: '发送通知',
          properties: {
            serviceClass: 'NotificationService',
            method: 'sendLeaveApprovalNotification',
            parameters: {
              applicantId: '${applicant.id}',
              approved: '${approved}',
              reason: '${approvalReason}'
            }
          },
          position: { x: 900, y: 200 }
        },
        {
          id: 'end',
          type: NodeType.END,
          name: '结束',
          properties: {},
          position: { x: 1100, y: 200 }
        }
      ],
      edges: [
        {
          id: 'edge1',
          sourceId: 'start',
          targetId: 'submit_application'
        },
        {
          id: 'edge2',
          sourceId: 'submit_application',
          targetId: 'manager_approval'
        },
        {
          id: 'edge3',
          sourceId: 'manager_approval',
          targetId: 'approval_gateway'
        },
        {
          id: 'edge4',
          sourceId: 'approval_gateway',
          targetId: 'hr_approval',
          condition: 'leaveDays > 3 && approved === true',
          label: '超过3天且经理同意'
        },
        {
          id: 'edge5',
          sourceId: 'approval_gateway',
          targetId: 'send_notification',
          condition: 'leaveDays <= 3 || approved === false',
          label: '3天以内或经理拒绝'
        },
        {
          id: 'edge6',
          sourceId: 'hr_approval',
          targetId: 'send_notification'
        },
        {
          id: 'edge7',
          sourceId: 'send_notification',
          targetId: 'end'
        }
      ],
      variables: {
        applicant: null,
        leaveDays: 0,
        leaveReason: '',
        approved: false,
        approvalReason: ''
      },
      settings: {
        timeout: 7 * 24 * 60 * 60 * 1000, // 7天超时
        retryPolicy: {
          maxRetries: 3,
          retryDelay: 1000
        },
        errorHandling: {
          onError: 'terminate'
        }
      }
    };
    
    const workflow = await this.engine.createWorkflow(workflowDefinition);
    console.log(`工作流创建成功: ${workflow.id}`);
  }
  
  private async startLeaveApprovalProcess(): Promise<void> {
    console.log('\n2. 启动请假审批流程');
    
    const context: WorkflowContext = {
      initiator: 'user123',
      businessKey: 'leave_2024_001',
      variables: {
        applicant: {
          id: 'user123',
          name: '张三',
          manager: 'manager456'
        },
        leaveDays: 5,
        leaveReason: '家庭事务',
        leaveStartDate: '2024-01-15',
        leaveEndDate: '2024-01-19'
      }
    };
    
    const instance = await this.engine.startWorkflow('leave_approval', context);
    console.log(`工作流实例启动成功: ${instance.id}`);
    console.log(`当前状态: ${instance.status}`);
  }
  
  private async handleUserTasks(): Promise<void> {
    console.log('\n3. 处理用户任务');
    
    // 模拟处理任务
    setTimeout(async () => {
      // 获取任务管理器
      const taskManager = (this.engine as any).executionEngine?.getTaskManager();
      if (!taskManager) return;
      
      // 查询待处理任务
      const pendingTasks = taskManager.queryTasks({
        status: TaskStatus.CREATED
      });
      
      console.log(`找到 ${pendingTasks.length} 个待处理任务`);
      
      for (const task of pendingTasks) {
        console.log(`处理任务: ${task.name} (${task.id})`);
        
        // 模拟任务完成
        const result: TaskResult = {
          success: true,
          data: {
            approved: true,
            approvalReason: '同意请假申请'
          },
          variables: {
            approved: true,
            approvalReason: '同意请假申请'
          }
        };
        
        await this.engine.completeTask(task.id, result);
        console.log(`任务完成: ${task.id}`);
      }
    }, 1000);
  }
  
  private async queryWorkflowStatus(): Promise<void> {
    console.log('\n4. 查询工作流状态');
    
    setTimeout(async () => {
      // 这里应该查询实际的工作流实例
      console.log('工作流执行状态查询功能演示');
    }, 2000);
  }
  
  private setupEventListeners(): void {
    // 监听工作流事件
    this.engine.on('workflow:started', (event) => {
      console.log(`[事件] 工作流启动: ${event.instance.id}`);
    });
    
    this.engine.on('workflow:completed', (event) => {
      console.log(`[事件] 工作流完成: ${event.instance.id}`);
    });
    
    this.engine.on('workflow:failed', (event) => {
      console.log(`[事件] 工作流失败: ${event.instance.id}, 错误: ${event.error.message}`);
    });
    
    this.engine.on('task:created', (event) => {
      console.log(`[事件] 任务创建: ${event.task.name} (${event.task.id})`);
    });
    
    this.engine.on('task:completed', (event) => {
      console.log(`[事件] 任务完成: ${event.task.name} (${event.task.id})`);
    });
  }
}

// 运行演示
const demo = new LowCodeWorkflowEngineDemo();
demo.demo().catch(console.error);

8.6 小结

本章详细介绍了低代码平台中工作流引擎的设计与实现,涵盖了以下核心内容:

8.6.1 核心要点

  1. 工作流引擎架构

    • 采用事件驱动的架构设计
    • 支持多种节点类型和执行模式
    • 提供完整的生命周期管理
  2. 节点系统设计

    • 抽象节点基类和具体节点实现
    • 支持用户任务、服务任务、脚本任务等
    • 网关节点实现条件分支和并行处理
  3. 实例管理

    • 工作流实例的状态管理
    • 支持暂停、恢复、终止操作
    • 完整的执行上下文和变量管理
  4. 任务管理

    • 用户任务的创建、分配、执行
    • 支持候选用户和候选组
    • 任务查询和状态跟踪

8.6.2 技术特色

  1. 类型安全: 使用 TypeScript 提供完整的类型定义
  2. 事件驱动: 基于事件总线的松耦合架构
  3. 可扩展性: 支持自定义节点类型和执行器
  4. 状态管理: 完整的工作流和任务状态跟踪
  5. 错误处理: 完善的异常处理和恢复机制

8.6.3 最佳实践

  1. 工作流设计: 保持流程简洁,避免过度复杂
  2. 节点实现: 确保节点执行的幂等性
  3. 错误处理: 实现适当的重试和回滚机制
  4. 性能优化: 合理使用并行网关提高执行效率
  5. 监控告警: 建立完善的流程监控体系

8.6.4 扩展方向

  1. 可视化设计器: 图形化的工作流设计界面
  2. 流程监控: 实时的流程执行监控和统计
  3. 版本管理: 工作流定义的版本控制
  4. 集成能力: 与外部系统的集成接口
  5. 规则引擎: 更强大的条件判断和规则处理

下一章我们将学习页面生成器与动态页面,了解如何基于配置动态生成完整的页面应用。}]},“query_language”:“Chinese”}}

// 结束节点 class EndNode extends BaseNode { async execute(context: ExecutionContext): Promise { return { success: true, data: context.variables, completed: true }; } }

// 用户任务节点 class UserTaskNode extends BaseNode { async execute(context: ExecutionContext): Promise { // 创建用户任务 const task = await this.createUserTask(context);

// 等待用户完成任务
return {
  success: true,
  data: context.variables,
  waiting: true,
  taskId: task.id
};

}

private async createUserTask(context: ExecutionContext): Promise { const task: Task = { id: this.generateTaskId(), instanceId: context.instance.id, nodeId: this.id, type: TaskType.USER_TASK, name: this.getProperty(‘taskName’, this.name), description: this.getProperty(‘description’), assignee: this.getProperty(‘assignee’), candidateUsers: this.getProperty(‘candidateUsers’, []), candidateGroups: this.getProperty(‘candidateGroups’, []), dueDate: this.calculateDueDate(), priority: this.getProperty(‘priority’, TaskPriority.NORMAL), formKey: this.getProperty(‘formKey’), variables: { …context.variables }, status: TaskStatus.CREATED, createdTime: new Date() };

// 保存任务
context.engine.addTask(task);

return task;

}

private calculateDueDate(): Date | undefined { const dueDateExpression = this.getProperty(‘dueDate’); if (!dueDateExpression) { return undefined; }

// 解析到期时间表达式
// 例如: "P1D" (1天后), "PT2H" (2小时后)
const now = new Date();

if (dueDateExpression.startsWith('P')) {
  // ISO 8601 duration format
  return this.parseDuration(dueDateExpression, now);
}

// 直接日期格式
return new Date(dueDateExpression);

}

private parseDuration(duration: string, baseDate: Date): Date { // 简化的 ISO 8601 duration 解析 const result = new Date(baseDate);

const dayMatch = duration.match(/P(\d+)D/);
if (dayMatch) {
  result.setDate(result.getDate() + parseInt(dayMatch[1]));
}

const hourMatch = duration.match(/PT(\d+)H/);
if (hourMatch) {
  result.setHours(result.getHours() + parseInt(hourMatch[1]));
}

const minuteMatch = duration.match(/PT(\d+)M/);
if (minuteMatch) {
  result.setMinutes(result.getMinutes() + parseInt(minuteMatch[1]));
}

return result;

}

private generateTaskId(): string { return ‘task’ + Date.now() + ‘’ + Math.random().toString(36).substr(2, 9); } }

// 服务任务节点 class ServiceTaskNode extends BaseNode { async execute(context: ExecutionContext): Promise { try { const serviceClass = this.getProperty(‘serviceClass’); const method = this.getProperty(‘method’); const parameters = this.getProperty(‘parameters’, {});

  // 解析参数中的变量
  const resolvedParameters = this.resolveParameters(parameters, context.variables);

  // 执行服务调用
  const result = await this.invokeService(serviceClass, method, resolvedParameters);

  // 更新变量
  const resultVariable = this.getProperty('resultVariable');
  if (resultVariable) {
    context.variables[resultVariable] = result;
  }

  return {
    success: true,
    data: result,
    nextNodes: this.getOutgoingNodes(context)
  };
} catch (error) {
  return {
    success: false,
    error: error.message
  };
}

}

private resolveParameters(parameters: Record, variables: Record): Record { const resolved: Record = {};

for (const [key, value] of Object.entries(parameters)) {
  if (typeof value === 'string' && value.startsWith('${') && value.endsWith('}')) {
    // 变量引用
    const variableName = value.slice(2, -1);
    resolved[key] = variables[variableName];
  } else {
    resolved[key] = value;
  }
}

return resolved;

}

private async invokeService(serviceClass: string, method: string, parameters: Record): Promise { // 这里应该实现实际的服务调用逻辑 // 可以通过依赖注入容器获取服务实例 console.log(Invoking service: ${serviceClass}.${method}, parameters);

// 模拟异步服务调用
return new Promise((resolve) => {
  setTimeout(() => {
    resolve({ success: true, timestamp: new Date() });
  }, 100);
});

}

private getOutgoingNodes(context: ExecutionContext): string[] { const workflow = context.workflow; const outgoingEdges = workflow.getOutgoingEdges(this.id); return outgoingEdges.map(edge => edge.targetId); } } “`