工作流引擎是低代码平台的重要组成部分,它能够自动化业务流程,提高工作效率,减少人工干预。本章将详细介绍如何设计和实现一个功能完善的工作流引擎系统。

12.1 工作流引擎架构概览

12.1.1 工作流引擎管理器接口

// 工作流引擎管理器接口
interface WorkflowEngine {
  // 工作流定义管理
  createWorkflow(definition: WorkflowDefinition): Promise<Workflow>;
  updateWorkflow(id: string, definition: WorkflowDefinition): Promise<Workflow>;
  deleteWorkflow(id: string): Promise<void>;
  getWorkflow(id: string): Promise<Workflow | null>;
  getWorkflows(query?: WorkflowQuery): Promise<Workflow[]>;
  
  // 工作流实例管理
  startWorkflow(workflowId: string, input?: Record<string, any>, context?: WorkflowContext): Promise<WorkflowInstance>;
  pauseWorkflow(instanceId: string): Promise<void>;
  resumeWorkflow(instanceId: string): Promise<void>;
  cancelWorkflow(instanceId: string, reason?: string): Promise<void>;
  retryWorkflow(instanceId: string, fromStep?: string): Promise<void>;
  
  // 工作流执行
  executeStep(instanceId: string, stepId: string, input?: Record<string, any>): Promise<StepResult>;
  completeStep(instanceId: string, stepId: string, output?: Record<string, any>): Promise<void>;
  skipStep(instanceId: string, stepId: string, reason?: string): Promise<void>;
  
  // 工作流监控
  getInstance(instanceId: string): Promise<WorkflowInstance | null>;
  getInstances(query?: InstanceQuery): Promise<WorkflowInstance[]>;
  getInstanceHistory(instanceId: string): Promise<WorkflowEvent[]>;
  getInstanceMetrics(instanceId: string): Promise<InstanceMetrics>;
  
  // 任务管理
  getTasks(query?: TaskQuery): Promise<Task[]>;
  assignTask(taskId: string, assignee: string): Promise<void>;
  completeTask(taskId: string, output?: Record<string, any>): Promise<void>;
  delegateTask(taskId: string, assignee: string): Promise<void>;
  
  // 事件处理
  on(event: string, handler: WorkflowEventHandler): void;
  off(event: string, handler: WorkflowEventHandler): void;
  emit(event: string, data: any): void;
}

// 工作流引擎实现
class LowCodeWorkflowEngine implements WorkflowEngine {
  private workflows: Map<string, Workflow> = new Map();
  private instances: Map<string, WorkflowInstance> = new Map();
  private tasks: Map<string, Task> = new Map();
  private eventBus: EventBus;
  private scheduler: WorkflowScheduler;
  private executor: WorkflowExecutor;
  private auditLogger: AuditLogger;
  
  constructor(options: WorkflowEngineOptions) {
    this.eventBus = options.eventBus || new EventBus();
    this.scheduler = new WorkflowScheduler(this.eventBus);
    this.executor = new WorkflowExecutor(this.eventBus);
    this.auditLogger = options.auditLogger;
    
    this.setupEventHandlers();
  }
  
  private setupEventHandlers(): void {
    this.eventBus.on('workflow.step.completed', this.handleStepCompleted.bind(this));
    this.eventBus.on('workflow.step.failed', this.handleStepFailed.bind(this));
    this.eventBus.on('workflow.instance.completed', this.handleInstanceCompleted.bind(this));
    this.eventBus.on('workflow.instance.failed', this.handleInstanceFailed.bind(this));
  }
  
  // 创建工作流
  async createWorkflow(definition: WorkflowDefinition): Promise<Workflow> {
    const workflow: Workflow = {
      id: this.generateId(),
      name: definition.name,
      description: definition.description,
      version: 1,
      status: WorkflowStatus.DRAFT,
      definition,
      createdAt: new Date(),
      updatedAt: new Date()
    };
    
    // 验证工作流定义
    const validation = this.validateWorkflowDefinition(definition);
    if (!validation.isValid) {
      throw new Error(`工作流定义验证失败: ${validation.errors.join(', ')}`);
    }
    
    this.workflows.set(workflow.id, workflow);
    
    // 记录审计日志
    await this.auditLogger.log({
      action: 'workflow.created',
      resourceType: 'workflow',
      resourceId: workflow.id,
      details: { name: workflow.name }
    });
    
    // 发送事件
    this.eventBus.emit('workflow.created', { workflowId: workflow.id, workflow });
    
    return workflow;
  }
  
  // 启动工作流实例
  async startWorkflow(
    workflowId: string, 
    input: Record<string, any> = {}, 
    context: WorkflowContext = {}
  ): Promise<WorkflowInstance> {
    const workflow = this.workflows.get(workflowId);
    if (!workflow) {
      throw new Error(`工作流不存在: ${workflowId}`);
    }
    
    if (workflow.status !== WorkflowStatus.PUBLISHED) {
      throw new Error(`工作流未发布,无法启动: ${workflowId}`);
    }
    
    const instance: WorkflowInstance = {
      id: this.generateId(),
      workflowId,
      workflowVersion: workflow.version,
      status: InstanceStatus.RUNNING,
      input,
      output: {},
      context: {
        ...context,
        startedBy: context.userId || 'system',
        startedAt: new Date()
      },
      currentStep: null,
      steps: new Map(),
      variables: { ...input },
      createdAt: new Date(),
      updatedAt: new Date()
    };
    
    this.instances.set(instance.id, instance);
    
    // 记录审计日志
    await this.auditLogger.log({
      action: 'workflow.instance.started',
      resourceType: 'workflow_instance',
      resourceId: instance.id,
      details: { workflowId, input }
    });
    
    // 发送事件
    this.eventBus.emit('workflow.instance.started', { instanceId: instance.id, instance });
    
    // 开始执行第一个步骤
    await this.executeNextStep(instance);
    
    return instance;
  }
  
  // 执行下一个步骤
  private async executeNextStep(instance: WorkflowInstance): Promise<void> {
    const workflow = this.workflows.get(instance.workflowId);
    if (!workflow) {
      throw new Error(`工作流不存在: ${instance.workflowId}`);
    }
    
    const nextStep = this.findNextStep(workflow.definition, instance);
    if (!nextStep) {
      // 工作流完成
      await this.completeWorkflowInstance(instance);
      return;
    }
    
    instance.currentStep = nextStep.id;
    instance.updatedAt = new Date();
    
    // 创建步骤实例
    const stepInstance: StepInstance = {
      id: this.generateId(),
      stepId: nextStep.id,
      instanceId: instance.id,
      status: StepStatus.RUNNING,
      input: this.prepareStepInput(nextStep, instance),
      output: {},
      startedAt: new Date(),
      attempts: 0
    };
    
    instance.steps.set(nextStep.id, stepInstance);
    
    // 发送事件
    this.eventBus.emit('workflow.step.started', { 
      instanceId: instance.id, 
      stepId: nextStep.id, 
      stepInstance 
    });
    
    // 执行步骤
    try {
      await this.executor.executeStep(nextStep, stepInstance, instance);
    } catch (error) {
      await this.handleStepError(instance, nextStep, error);
    }
  }
  
  // 查找下一个步骤
  private findNextStep(definition: WorkflowDefinition, instance: WorkflowInstance): StepDefinition | null {
    if (!instance.currentStep) {
      // 返回开始步骤
      return definition.steps.find(step => step.type === StepType.START) || definition.steps[0];
    }
    
    const currentStep = definition.steps.find(step => step.id === instance.currentStep);
    if (!currentStep) {
      return null;
    }
    
    // 根据连接规则查找下一个步骤
    for (const connection of definition.connections) {
      if (connection.from === currentStep.id) {
        // 检查条件
        if (this.evaluateCondition(connection.condition, instance)) {
          return definition.steps.find(step => step.id === connection.to);
        }
      }
    }
    
    return null;
  }
  
  // 评估条件
  private evaluateCondition(condition: string | undefined, instance: WorkflowInstance): boolean {
    if (!condition) {
      return true;
    }
    
    try {
      // 简单的条件评估器
      const context = {
        variables: instance.variables,
        output: instance.output,
        context: instance.context
      };
      
      // 这里可以使用更复杂的表达式引擎
      return new Function('context', `with(context) { return ${condition}; }`)(context);
    } catch (error) {
      console.error('条件评估失败:', error);
      return false;
    }
  }
  
  // 准备步骤输入
  private prepareStepInput(step: StepDefinition, instance: WorkflowInstance): Record<string, any> {
    const input: Record<string, any> = {};
    
    // 映射输入参数
    if (step.inputMapping) {
      for (const [key, mapping] of Object.entries(step.inputMapping)) {
        input[key] = this.resolveMapping(mapping, instance);
      }
    }
    
    return input;
  }
  
  // 解析映射
  private resolveMapping(mapping: string, instance: WorkflowInstance): any {
    if (mapping.startsWith('${') && mapping.endsWith('}')) {
      const expression = mapping.slice(2, -1);
      const context = {
        variables: instance.variables,
        output: instance.output,
        context: instance.context
      };
      
      try {
        return new Function('context', `with(context) { return ${expression}; }`)(context);
      } catch (error) {
        console.error('映射解析失败:', error);
        return null;
      }
    }
    
    return mapping;
  }
  
  // 处理步骤完成
  private async handleStepCompleted(event: any): Promise<void> {
    const { instanceId, stepId, output } = event;
    const instance = this.instances.get(instanceId);
    
    if (!instance) {
      return;
    }
    
    const stepInstance = instance.steps.get(stepId);
    if (!stepInstance) {
      return;
    }
    
    // 更新步骤状态
    stepInstance.status = StepStatus.COMPLETED;
    stepInstance.output = output;
    stepInstance.completedAt = new Date();
    
    // 更新实例变量
    if (output) {
      Object.assign(instance.variables, output);
    }
    
    instance.updatedAt = new Date();
    
    // 记录审计日志
    await this.auditLogger.log({
      action: 'workflow.step.completed',
      resourceType: 'workflow_step',
      resourceId: stepId,
      details: { instanceId, output }
    });
    
    // 执行下一个步骤
    await this.executeNextStep(instance);
  }
  
  // 处理步骤失败
  private async handleStepFailed(event: any): Promise<void> {
    const { instanceId, stepId, error } = event;
    const instance = this.instances.get(instanceId);
    
    if (!instance) {
      return;
    }
    
    const stepInstance = instance.steps.get(stepId);
    if (!stepInstance) {
      return;
    }
    
    stepInstance.attempts++;
    
    const workflow = this.workflows.get(instance.workflowId);
    const stepDefinition = workflow?.definition.steps.find(s => s.id === stepId);
    
    // 检查是否需要重试
    const maxRetries = stepDefinition?.retryPolicy?.maxRetries || 0;
    if (stepInstance.attempts <= maxRetries) {
      // 重试步骤
      setTimeout(async () => {
        try {
          await this.executor.executeStep(stepDefinition!, stepInstance, instance);
        } catch (retryError) {
          await this.handleStepError(instance, stepDefinition!, retryError);
        }
      }, stepDefinition?.retryPolicy?.delay || 1000);
    } else {
      // 步骤最终失败
      await this.handleStepError(instance, stepDefinition!, error);
    }
  }
  
  // 处理步骤错误
  private async handleStepError(
    instance: WorkflowInstance, 
    step: StepDefinition, 
    error: any
  ): Promise<void> {
    const stepInstance = instance.steps.get(step.id);
    if (stepInstance) {
      stepInstance.status = StepStatus.FAILED;
      stepInstance.error = error.message;
      stepInstance.completedAt = new Date();
    }
    
    // 根据错误处理策略决定下一步
    switch (step.errorHandling?.strategy) {
      case ErrorHandlingStrategy.CONTINUE:
        // 继续执行下一个步骤
        await this.executeNextStep(instance);
        break;
        
      case ErrorHandlingStrategy.RETRY:
        // 重试当前步骤(已在 handleStepFailed 中处理)
        break;
        
      case ErrorHandlingStrategy.COMPENSATE:
        // 执行补偿步骤
        if (step.errorHandling.compensationStep) {
          await this.executeCompensationStep(instance, step.errorHandling.compensationStep);
        }
        break;
        
      case ErrorHandlingStrategy.ABORT:
      default:
        // 终止工作流
        await this.failWorkflowInstance(instance, error.message);
        break;
    }
  }
  
  // 完成工作流实例
  private async completeWorkflowInstance(instance: WorkflowInstance): Promise<void> {
    instance.status = InstanceStatus.COMPLETED;
    instance.completedAt = new Date();
    instance.updatedAt = new Date();
    
    // 记录审计日志
    await this.auditLogger.log({
      action: 'workflow.instance.completed',
      resourceType: 'workflow_instance',
      resourceId: instance.id,
      details: { workflowId: instance.workflowId, output: instance.output }
    });
    
    // 发送事件
    this.eventBus.emit('workflow.instance.completed', { instanceId: instance.id, instance });
  }
  
  // 失败工作流实例
  private async failWorkflowInstance(instance: WorkflowInstance, reason: string): Promise<void> {
    instance.status = InstanceStatus.FAILED;
    instance.error = reason;
    instance.completedAt = new Date();
    instance.updatedAt = new Date();
    
    // 记录审计日志
    await this.auditLogger.log({
      action: 'workflow.instance.failed',
      resourceType: 'workflow_instance',
      resourceId: instance.id,
      details: { workflowId: instance.workflowId, reason }
    });
    
    // 发送事件
    this.eventBus.emit('workflow.instance.failed', { instanceId: instance.id, instance, reason });
  }
  
  // 验证工作流定义
  private validateWorkflowDefinition(definition: WorkflowDefinition): ValidationResult {
    const errors: string[] = [];
    const warnings: string[] = [];
    
    // 检查基本信息
    if (!definition.name) {
      errors.push('工作流名称不能为空');
    }
    
    if (!definition.steps || definition.steps.length === 0) {
      errors.push('工作流必须包含至少一个步骤');
    }
    
    // 检查步骤
    const stepIds = new Set<string>();
    for (const step of definition.steps) {
      if (!step.id) {
        errors.push('步骤ID不能为空');
      } else if (stepIds.has(step.id)) {
        errors.push(`重复的步骤ID: ${step.id}`);
      } else {
        stepIds.add(step.id);
      }
      
      if (!step.name) {
        errors.push(`步骤 ${step.id} 的名称不能为空`);
      }
      
      if (!step.type) {
        errors.push(`步骤 ${step.id} 的类型不能为空`);
      }
    }
    
    // 检查连接
    for (const connection of definition.connections) {
      if (!stepIds.has(connection.from)) {
        errors.push(`连接引用了不存在的源步骤: ${connection.from}`);
      }
      
      if (!stepIds.has(connection.to)) {
        errors.push(`连接引用了不存在的目标步骤: ${connection.to}`);
      }
    }
    
    // 检查是否有开始步骤
    const hasStartStep = definition.steps.some(step => step.type === StepType.START);
    if (!hasStartStep) {
      warnings.push('建议添加一个开始步骤');
    }
    
    // 检查是否有结束步骤
    const hasEndStep = definition.steps.some(step => step.type === StepType.END);
    if (!hasEndStep) {
      warnings.push('建议添加一个结束步骤');
    }
    
    return {
      isValid: errors.length === 0,
      errors,
      warnings
    };
  }
  
  // 生成ID
  private generateId(): string {
    return 'wf_' + Math.random().toString(36).substr(2, 9);
  }
  
  // 其他接口方法的实现...
  async updateWorkflow(id: string, definition: WorkflowDefinition): Promise<Workflow> {
    const workflow = this.workflows.get(id);
    if (!workflow) {
      throw new Error(`工作流不存在: ${id}`);
    }
    
    // 验证工作流定义
    const validation = this.validateWorkflowDefinition(definition);
    if (!validation.isValid) {
      throw new Error(`工作流定义验证失败: ${validation.errors.join(', ')}`);
    }
    
    workflow.definition = definition;
    workflow.version++;
    workflow.updatedAt = new Date();
    
    await this.auditLogger.log({
      action: 'workflow.updated',
      resourceType: 'workflow',
      resourceId: id,
      details: { version: workflow.version }
    });
    
    this.eventBus.emit('workflow.updated', { workflowId: id, workflow });
    
    return workflow;
  }
  
  async deleteWorkflow(id: string): Promise<void> {
    const workflow = this.workflows.get(id);
    if (!workflow) {
      throw new Error(`工作流不存在: ${id}`);
    }
    
    // 检查是否有运行中的实例
    const runningInstances = Array.from(this.instances.values())
      .filter(instance => instance.workflowId === id && instance.status === InstanceStatus.RUNNING);
    
    if (runningInstances.length > 0) {
      throw new Error(`无法删除工作流,存在 ${runningInstances.length} 个运行中的实例`);
    }
    
    this.workflows.delete(id);
    
    await this.auditLogger.log({
      action: 'workflow.deleted',
      resourceType: 'workflow',
      resourceId: id,
      details: { name: workflow.name }
    });
    
    this.eventBus.emit('workflow.deleted', { workflowId: id });
  }
  
  async getWorkflow(id: string): Promise<Workflow | null> {
    return this.workflows.get(id) || null;
  }
  
  async getWorkflows(query: WorkflowQuery = {}): Promise<Workflow[]> {
    let workflows = Array.from(this.workflows.values());
    
    // 应用过滤器
    if (query.status) {
      workflows = workflows.filter(w => w.status === query.status);
    }
    
    if (query.search) {
      const searchLower = query.search.toLowerCase();
      workflows = workflows.filter(w => 
        w.name.toLowerCase().includes(searchLower) ||
        (w.description && w.description.toLowerCase().includes(searchLower))
      );
    }
    
    // 应用排序
    if (query.sortBy) {
      workflows.sort((a, b) => {
        const aValue = (a as any)[query.sortBy!];
        const bValue = (b as any)[query.sortBy!];
        
        if (query.sortOrder === 'desc') {
          return bValue > aValue ? 1 : -1;
        } else {
          return aValue > bValue ? 1 : -1;
        }
      });
    }
    
    // 应用分页
    if (query.limit) {
      const offset = query.offset || 0;
      workflows = workflows.slice(offset, offset + query.limit);
    }
    
    return workflows;
  }
  
  async getInstance(instanceId: string): Promise<WorkflowInstance | null> {
    return this.instances.get(instanceId) || null;
  }
  
  async getInstances(query: InstanceQuery = {}): Promise<WorkflowInstance[]> {
    let instances = Array.from(this.instances.values());
    
    // 应用过滤器
    if (query.workflowId) {
      instances = instances.filter(i => i.workflowId === query.workflowId);
    }
    
    if (query.status) {
      instances = instances.filter(i => i.status === query.status);
    }
    
    if (query.startedBy) {
      instances = instances.filter(i => i.context.startedBy === query.startedBy);
    }
    
    // 应用排序
    if (query.sortBy) {
      instances.sort((a, b) => {
        const aValue = (a as any)[query.sortBy!];
        const bValue = (b as any)[query.sortBy!];
        
        if (query.sortOrder === 'desc') {
          return bValue > aValue ? 1 : -1;
        } else {
          return aValue > bValue ? 1 : -1;
        }
      });
    }
    
    // 应用分页
    if (query.limit) {
      const offset = query.offset || 0;
      instances = instances.slice(offset, offset + query.limit);
    }
    
    return instances;
  }
  
  // 事件处理方法
  on(event: string, handler: WorkflowEventHandler): void {
    this.eventBus.on(event, handler);
  }
  
  off(event: string, handler: WorkflowEventHandler): void {
    this.eventBus.off(event, handler);
  }
  
  emit(event: string, data: any): void {
    this.eventBus.emit(event, data);
  }
}

12.2 核心数据结构

12.2.1 工作流相关接口

// 工作流定义
interface WorkflowDefinition {
  name: string;
  description?: string;
  version?: number;
  steps: StepDefinition[];
  connections: ConnectionDefinition[];
  variables?: VariableDefinition[];
  triggers?: TriggerDefinition[];
  settings?: WorkflowSettings;
  metadata?: Record<string, any>;
}

// 工作流
interface Workflow {
  id: string;
  name: string;
  description?: string;
  version: number;
  status: WorkflowStatus;
  definition: WorkflowDefinition;
  createdAt: Date;
  updatedAt: Date;
  createdBy?: string;
  tags?: string[];
}

// 工作流状态
enum WorkflowStatus {
  DRAFT = 'draft',
  PUBLISHED = 'published',
  DEPRECATED = 'deprecated',
  ARCHIVED = 'archived'
}

// 步骤定义
interface StepDefinition {
  id: string;
  name: string;
  description?: string;
  type: StepType;
  config: StepConfig;
  inputMapping?: Record<string, string>;
  outputMapping?: Record<string, string>;
  condition?: string;
  timeout?: number;
  retryPolicy?: RetryPolicy;
  errorHandling?: ErrorHandling;
  position?: Position;
}

// 步骤类型
enum StepType {
  START = 'start',
  END = 'end',
  TASK = 'task',
  DECISION = 'decision',
  PARALLEL = 'parallel',
  LOOP = 'loop',
  SUBPROCESS = 'subprocess',
  SCRIPT = 'script',
  HTTP = 'http',
  EMAIL = 'email',
  APPROVAL = 'approval',
  TIMER = 'timer',
  EVENT = 'event'
}

// 步骤配置
interface StepConfig {
  [key: string]: any;
}

// 连接定义
interface ConnectionDefinition {
  id: string;
  from: string;
  to: string;
  condition?: string;
  label?: string;
}

// 变量定义
interface VariableDefinition {
  name: string;
  type: VariableType;
  defaultValue?: any;
  description?: string;
  required?: boolean;
}

// 变量类型
enum VariableType {
  STRING = 'string',
  NUMBER = 'number',
  BOOLEAN = 'boolean',
  DATE = 'date',
  OBJECT = 'object',
  ARRAY = 'array'
}

// 触发器定义
interface TriggerDefinition {
  id: string;
  type: TriggerType;
  config: TriggerConfig;
  enabled: boolean;
}

// 触发器类型
enum TriggerType {
  MANUAL = 'manual',
  SCHEDULE = 'schedule',
  EVENT = 'event',
  WEBHOOK = 'webhook',
  FILE = 'file',
  EMAIL = 'email'
}

// 触发器配置
interface TriggerConfig {
  [key: string]: any;
}

// 工作流设置
interface WorkflowSettings {
  maxConcurrentInstances?: number;
  instanceTimeout?: number;
  enableAudit?: boolean;
  enableMetrics?: boolean;
  notificationSettings?: NotificationSettings;
}

// 通知设置
interface NotificationSettings {
  onStart?: NotificationConfig[];
  onComplete?: NotificationConfig[];
  onError?: NotificationConfig[];
}

// 通知配置
interface NotificationConfig {
  type: NotificationType;
  recipients: string[];
  template?: string;
}

// 通知类型
enum NotificationType {
  EMAIL = 'email',
  SMS = 'sms',
  WEBHOOK = 'webhook',
  SLACK = 'slack'
}

// 重试策略
interface RetryPolicy {
  maxRetries: number;
  delay: number;
  backoffMultiplier?: number;
  maxDelay?: number;
}

// 错误处理
interface ErrorHandling {
  strategy: ErrorHandlingStrategy;
  compensationStep?: string;
  continueOnError?: boolean;
}

// 错误处理策略
enum ErrorHandlingStrategy {
  ABORT = 'abort',
  RETRY = 'retry',
  CONTINUE = 'continue',
  COMPENSATE = 'compensate'
}

// 位置信息
interface Position {
  x: number;
  y: number;
}

12.2.2 工作流实例相关接口

// 工作流实例
interface WorkflowInstance {
  id: string;
  workflowId: string;
  workflowVersion: number;
  status: InstanceStatus;
  input: Record<string, any>;
  output: Record<string, any>;
  context: WorkflowContext;
  currentStep: string | null;
  steps: Map<string, StepInstance>;
  variables: Record<string, any>;
  error?: string;
  createdAt: Date;
  updatedAt: Date;
  startedAt?: Date;
  completedAt?: Date;
  duration?: number;
}

// 实例状态
enum InstanceStatus {
  PENDING = 'pending',
  RUNNING = 'running',
  PAUSED = 'paused',
  COMPLETED = 'completed',
  FAILED = 'failed',
  CANCELLED = 'cancelled'
}

// 工作流上下文
interface WorkflowContext {
  userId?: string;
  tenantId?: string;
  correlationId?: string;
  priority?: number;
  tags?: string[];
  metadata?: Record<string, any>;
  startedBy?: string;
  startedAt?: Date;
}

// 步骤实例
interface StepInstance {
  id: string;
  stepId: string;
  instanceId: string;
  status: StepStatus;
  input: Record<string, any>;
  output: Record<string, any>;
  error?: string;
  startedAt: Date;
  completedAt?: Date;
  duration?: number;
  attempts: number;
  assignee?: string;
}

// 步骤状态
enum StepStatus {
  PENDING = 'pending',
  RUNNING = 'running',
  WAITING = 'waiting',
  COMPLETED = 'completed',
  FAILED = 'failed',
  SKIPPED = 'skipped',
  CANCELLED = 'cancelled'
}

// 步骤结果
interface StepResult {
  success: boolean;
  output?: Record<string, any>;
  error?: string;
  nextStep?: string;
}

// 任务
interface Task {
  id: string;
  instanceId: string;
  stepId: string;
  name: string;
  description?: string;
  type: TaskType;
  status: TaskStatus;
  assignee?: string;
  assignedAt?: Date;
  dueDate?: Date;
  priority: TaskPriority;
  input: Record<string, any>;
  output?: Record<string, any>;
  createdAt: Date;
  updatedAt: Date;
  completedAt?: Date;
}

// 任务类型
enum TaskType {
  USER_TASK = 'user_task',
  APPROVAL = 'approval',
  REVIEW = 'review',
  FORM = 'form',
  DECISION = 'decision'
}

// 任务状态
enum TaskStatus {
  CREATED = 'created',
  ASSIGNED = 'assigned',
  IN_PROGRESS = 'in_progress',
  COMPLETED = 'completed',
  CANCELLED = 'cancelled',
  EXPIRED = 'expired'
}

// 任务优先级
enum TaskPriority {
  LOW = 'low',
  NORMAL = 'normal',
  HIGH = 'high',
  URGENT = 'urgent'
}

// 工作流事件
interface WorkflowEvent {
  id: string;
  instanceId: string;
  type: WorkflowEventType;
  stepId?: string;
  data: Record<string, any>;
  timestamp: Date;
  userId?: string;
}

// 工作流事件类型
enum WorkflowEventType {
  INSTANCE_STARTED = 'instance_started',
  INSTANCE_COMPLETED = 'instance_completed',
  INSTANCE_FAILED = 'instance_failed',
  INSTANCE_CANCELLED = 'instance_cancelled',
  STEP_STARTED = 'step_started',
  STEP_COMPLETED = 'step_completed',
  STEP_FAILED = 'step_failed',
  STEP_SKIPPED = 'step_skipped',
  TASK_CREATED = 'task_created',
  TASK_ASSIGNED = 'task_assigned',
  TASK_COMPLETED = 'task_completed'
}

// 实例指标
interface InstanceMetrics {
  totalSteps: number;
  completedSteps: number;
  failedSteps: number;
  skippedSteps: number;
  duration: number;
  averageStepDuration: number;
  bottleneckSteps: string[];
}

// 查询接口
interface WorkflowQuery {
  status?: WorkflowStatus;
  search?: string;
  tags?: string[];
  createdBy?: string;
  sortBy?: string;
  sortOrder?: 'asc' | 'desc';
  limit?: number;
  offset?: number;
}

interface InstanceQuery {
  workflowId?: string;
  status?: InstanceStatus;
  startedBy?: string;
  dateRange?: DateRange;
  sortBy?: string;
  sortOrder?: 'asc' | 'desc';
  limit?: number;
  offset?: number;
}

interface TaskQuery {
  assignee?: string;
  status?: TaskStatus;
  type?: TaskType;
  priority?: TaskPriority;
  dueDate?: Date;
  sortBy?: string;
  sortOrder?: 'asc' | 'desc';
  limit?: number;
  offset?: number;
}

interface DateRange {
  start: Date;
  end: Date;
}

// 工作流引擎选项
interface WorkflowEngineOptions {
  eventBus?: EventBus;
  auditLogger: AuditLogger;
  scheduler?: WorkflowScheduler;
  executor?: WorkflowExecutor;
  storage?: WorkflowStorage;
}

// 验证结果
interface ValidationResult {
  isValid: boolean;
  errors: string[];
  warnings: string[];
}

// 事件处理器
type WorkflowEventHandler = (event: any) => void | Promise<void>;

12.3 工作流执行器

12.3.1 执行器接口和实现

// 工作流执行器接口
interface WorkflowExecutor {
  executeStep(step: StepDefinition, stepInstance: StepInstance, workflowInstance: WorkflowInstance): Promise<StepResult>;
  registerStepHandler(type: StepType, handler: StepHandler): void;
  unregisterStepHandler(type: StepType): void;
}

// 步骤处理器接口
interface StepHandler {
  execute(step: StepDefinition, stepInstance: StepInstance, workflowInstance: WorkflowInstance): Promise<StepResult>;
  validate?(step: StepDefinition): ValidationResult;
  getSchema?(): any;
}

// 工作流执行器实现
class LowCodeWorkflowExecutor implements WorkflowExecutor {
  private stepHandlers: Map<StepType, StepHandler> = new Map();
  private eventBus: EventBus;
  
  constructor(eventBus: EventBus) {
    this.eventBus = eventBus;
    this.registerBuiltinHandlers();
  }
  
  // 注册内置处理器
  private registerBuiltinHandlers(): void {
    this.registerStepHandler(StepType.START, new StartStepHandler());
    this.registerStepHandler(StepType.END, new EndStepHandler());
    this.registerStepHandler(StepType.TASK, new TaskStepHandler());
    this.registerStepHandler(StepType.DECISION, new DecisionStepHandler());
    this.registerStepHandler(StepType.SCRIPT, new ScriptStepHandler());
    this.registerStepHandler(StepType.HTTP, new HttpStepHandler());
    this.registerStepHandler(StepType.EMAIL, new EmailStepHandler());
    this.registerStepHandler(StepType.APPROVAL, new ApprovalStepHandler());
    this.registerStepHandler(StepType.TIMER, new TimerStepHandler());
  }
  
  // 执行步骤
  async executeStep(
    step: StepDefinition, 
    stepInstance: StepInstance, 
    workflowInstance: WorkflowInstance
  ): Promise<StepResult> {
    const handler = this.stepHandlers.get(step.type);
    if (!handler) {
      throw new Error(`不支持的步骤类型: ${step.type}`);
    }
    
    try {
      stepInstance.status = StepStatus.RUNNING;
      stepInstance.startedAt = new Date();
      
      // 发送步骤开始事件
      this.eventBus.emit('workflow.step.started', {
        instanceId: workflowInstance.id,
        stepId: step.id,
        stepInstance
      });
      
      // 执行步骤
      const result = await handler.execute(step, stepInstance, workflowInstance);
      
      // 更新步骤状态
      stepInstance.status = result.success ? StepStatus.COMPLETED : StepStatus.FAILED;
      stepInstance.output = result.output || {};
      stepInstance.completedAt = new Date();
      stepInstance.duration = stepInstance.completedAt.getTime() - stepInstance.startedAt.getTime();
      
      if (result.error) {
        stepInstance.error = result.error;
      }
      
      // 发送步骤完成或失败事件
      if (result.success) {
        this.eventBus.emit('workflow.step.completed', {
          instanceId: workflowInstance.id,
          stepId: step.id,
          output: result.output
        });
      } else {
        this.eventBus.emit('workflow.step.failed', {
          instanceId: workflowInstance.id,
          stepId: step.id,
          error: result.error
        });
      }
      
      return result;
    } catch (error) {
      stepInstance.status = StepStatus.FAILED;
      stepInstance.error = error.message;
      stepInstance.completedAt = new Date();
      stepInstance.duration = stepInstance.completedAt.getTime() - stepInstance.startedAt.getTime();
      
      this.eventBus.emit('workflow.step.failed', {
        instanceId: workflowInstance.id,
        stepId: step.id,
        error: error.message
      });
      
      throw error;
    }
  }
  
  // 注册步骤处理器
  registerStepHandler(type: StepType, handler: StepHandler): void {
    this.stepHandlers.set(type, handler);
  }
  
  // 注销步骤处理器
  unregisterStepHandler(type: StepType): void {
    this.stepHandlers.delete(type);
  }
}

12.3.2 内置步骤处理器

// 开始步骤处理器
class StartStepHandler implements StepHandler {
  async execute(
    step: StepDefinition, 
    stepInstance: StepInstance, 
    workflowInstance: WorkflowInstance
  ): Promise<StepResult> {
    // 开始步骤只是标记工作流开始,不执行具体逻辑
    return {
      success: true,
      output: {
        startedAt: new Date().toISOString(),
        instanceId: workflowInstance.id
      }
    };
  }
}

// 结束步骤处理器
class EndStepHandler implements StepHandler {
  async execute(
    step: StepDefinition, 
    stepInstance: StepInstance, 
    workflowInstance: WorkflowInstance
  ): Promise<StepResult> {
    // 结束步骤标记工作流完成
    return {
      success: true,
      output: {
        completedAt: new Date().toISOString(),
        duration: Date.now() - workflowInstance.createdAt.getTime()
      }
    };
  }
}

// 任务步骤处理器
class TaskStepHandler implements StepHandler {
  async execute(
    step: StepDefinition, 
    stepInstance: StepInstance, 
    workflowInstance: WorkflowInstance
  ): Promise<StepResult> {
    const config = step.config as TaskStepConfig;
    
    // 创建任务
    const task: Task = {
      id: this.generateId(),
      instanceId: workflowInstance.id,
      stepId: step.id,
      name: config.taskName || step.name,
      description: config.taskDescription || step.description,
      type: config.taskType || TaskType.USER_TASK,
      status: TaskStatus.CREATED,
      assignee: config.assignee,
      dueDate: config.dueDate ? new Date(config.dueDate) : undefined,
      priority: config.priority || TaskPriority.NORMAL,
      input: stepInstance.input,
      createdAt: new Date(),
      updatedAt: new Date()
    };
    
    // 如果有指定受理人,直接分配任务
    if (task.assignee) {
      task.status = TaskStatus.ASSIGNED;
      task.assignedAt = new Date();
    }
    
    // 这里应该将任务保存到任务管理系统
    // await taskManager.createTask(task);
    
    // 任务步骤需要等待外部完成
    stepInstance.status = StepStatus.WAITING;
    
    return {
      success: true,
      output: {
        taskId: task.id,
        status: 'waiting_for_completion'
      }
    };
  }
  
  private generateId(): string {
    return 'task_' + Math.random().toString(36).substr(2, 9);
  }
}

// 决策步骤处理器
class DecisionStepHandler implements StepHandler {
  async execute(
    step: StepDefinition, 
    stepInstance: StepInstance, 
    workflowInstance: WorkflowInstance
  ): Promise<StepResult> {
    const config = step.config as DecisionStepConfig;
    
    // 评估决策条件
    const context = {
      variables: workflowInstance.variables,
      input: stepInstance.input,
      context: workflowInstance.context
    };
    
    let selectedBranch: string | null = null;
    
    // 按顺序评估条件
    for (const condition of config.conditions) {
      if (this.evaluateCondition(condition.expression, context)) {
        selectedBranch = condition.branch;
        break;
      }
    }
    
    // 如果没有匹配的条件,使用默认分支
    if (!selectedBranch && config.defaultBranch) {
      selectedBranch = config.defaultBranch;
    }
    
    return {
      success: true,
      output: {
        selectedBranch,
        evaluatedConditions: config.conditions.map(c => ({
          expression: c.expression,
          result: this.evaluateCondition(c.expression, context)
        }))
      },
      nextStep: selectedBranch || undefined
    };
  }
  
  private evaluateCondition(expression: string, context: any): boolean {
    try {
      return new Function('context', `with(context) { return ${expression}; }`)(context);
    } catch (error) {
      console.error('条件评估失败:', error);
      return false;
    }
  }
}

// 脚本步骤处理器
class ScriptStepHandler implements StepHandler {
  async execute(
    step: StepDefinition, 
    stepInstance: StepInstance, 
    workflowInstance: WorkflowInstance
  ): Promise<StepResult> {
    const config = step.config as ScriptStepConfig;
    
    try {
      const context = {
        variables: workflowInstance.variables,
        input: stepInstance.input,
        context: workflowInstance.context,
        console: {
          log: (...args: any[]) => console.log('[Script]', ...args),
          error: (...args: any[]) => console.error('[Script]', ...args)
        }
      };
      
      // 执行脚本
      const result = new Function('context', config.script)(context);
      
      // 如果返回Promise,等待完成
      const output = result instanceof Promise ? await result : result;
      
      return {
        success: true,
        output: output || {}
      };
    } catch (error) {
      return {
        success: false,
        error: error.message
      };
    }
  }
}

// HTTP步骤处理器
class HttpStepHandler implements StepHandler {
  async execute(
    step: StepDefinition, 
    stepInstance: StepInstance, 
    workflowInstance: WorkflowInstance
  ): Promise<StepResult> {
    const config = step.config as HttpStepConfig;
    
    try {
      // 准备请求参数
      const url = this.resolveTemplate(config.url, stepInstance.input, workflowInstance.variables);
      const headers = this.resolveHeaders(config.headers, stepInstance.input, workflowInstance.variables);
      const body = config.body ? this.resolveTemplate(config.body, stepInstance.input, workflowInstance.variables) : undefined;
      
      // 发送HTTP请求
      const response = await fetch(url, {
        method: config.method || 'GET',
        headers,
        body: body ? JSON.stringify(body) : undefined,
        timeout: config.timeout || 30000
      });
      
      const responseData = await response.json();
      
      if (!response.ok) {
        return {
          success: false,
          error: `HTTP请求失败: ${response.status} ${response.statusText}`
        };
      }
      
      return {
        success: true,
        output: {
          status: response.status,
          headers: Object.fromEntries(response.headers.entries()),
          data: responseData
        }
      };
    } catch (error) {
      return {
        success: false,
        error: error.message
      };
    }
  }
  
  private resolveTemplate(template: string, input: any, variables: any): any {
    if (typeof template !== 'string') {
      return template;
    }
    
    return template.replace(/\${([^}]+)}/g, (match, expression) => {
      try {
        const context = { input, variables };
        return new Function('context', `with(context) { return ${expression}; }`)(context);
      } catch (error) {
        console.error('模板解析失败:', error);
        return match;
      }
    });
  }
  
  private resolveHeaders(headers: Record<string, string> | undefined, input: any, variables: any): Record<string, string> {
    if (!headers) {
      return { 'Content-Type': 'application/json' };
    }
    
    const resolvedHeaders: Record<string, string> = {};
    for (const [key, value] of Object.entries(headers)) {
      resolvedHeaders[key] = this.resolveTemplate(value, input, variables);
    }
    
    return resolvedHeaders;
  }
}

// 邮件步骤处理器
class EmailStepHandler implements StepHandler {
  async execute(
    step: StepDefinition, 
    stepInstance: StepInstance, 
    workflowInstance: WorkflowInstance
  ): Promise<StepResult> {
    const config = step.config as EmailStepConfig;
    
    try {
      // 解析邮件参数
      const to = this.resolveTemplate(config.to, stepInstance.input, workflowInstance.variables);
      const subject = this.resolveTemplate(config.subject, stepInstance.input, workflowInstance.variables);
      const body = this.resolveTemplate(config.body, stepInstance.input, workflowInstance.variables);
      
      // 这里应该集成实际的邮件服务
      // await emailService.sendEmail({ to, subject, body });
      
      console.log('发送邮件:', { to, subject, body });
      
      return {
        success: true,
        output: {
          to,
          subject,
          sentAt: new Date().toISOString()
        }
      };
    } catch (error) {
      return {
        success: false,
        error: error.message
      };
    }
  }
  
  private resolveTemplate(template: string, input: any, variables: any): string {
    if (typeof template !== 'string') {
      return String(template);
    }
    
    return template.replace(/\${([^}]+)}/g, (match, expression) => {
      try {
        const context = { input, variables };
        return new Function('context', `with(context) { return ${expression}; }`)(context);
      } catch (error) {
        console.error('模板解析失败:', error);
        return match;
      }
    });
  }
}

// 审批步骤处理器
class ApprovalStepHandler implements StepHandler {
  async execute(
    step: StepDefinition, 
    stepInstance: StepInstance, 
    workflowInstance: WorkflowInstance
  ): Promise<StepResult> {
    const config = step.config as ApprovalStepConfig;
    
    // 创建审批任务
    const approvalTask: Task = {
      id: this.generateId(),
      instanceId: workflowInstance.id,
      stepId: step.id,
      name: config.approvalTitle || `${step.name} - 审批`,
      description: config.approvalDescription,
      type: TaskType.APPROVAL,
      status: TaskStatus.CREATED,
      assignee: config.approver,
      dueDate: config.dueDate ? new Date(config.dueDate) : undefined,
      priority: config.priority || TaskPriority.NORMAL,
      input: {
        ...stepInstance.input,
        approvalOptions: config.approvalOptions || ['approve', 'reject'],
        approvalForm: config.approvalForm
      },
      createdAt: new Date(),
      updatedAt: new Date()
    };
    
    if (approvalTask.assignee) {
      approvalTask.status = TaskStatus.ASSIGNED;
      approvalTask.assignedAt = new Date();
    }
    
    // 审批步骤需要等待外部完成
    stepInstance.status = StepStatus.WAITING;
    
    return {
      success: true,
      output: {
        approvalTaskId: approvalTask.id,
        status: 'waiting_for_approval'
      }
    };
  }
  
  private generateId(): string {
    return 'approval_' + Math.random().toString(36).substr(2, 9);
  }
}

// 定时器步骤处理器
class TimerStepHandler implements StepHandler {
  async execute(
    step: StepDefinition, 
    stepInstance: StepInstance, 
    workflowInstance: WorkflowInstance
  ): Promise<StepResult> {
    const config = step.config as TimerStepConfig;
    
    return new Promise((resolve) => {
      const delay = config.delay || 1000;
      
      setTimeout(() => {
        resolve({
          success: true,
          output: {
            delayMs: delay,
            completedAt: new Date().toISOString()
          }
        });
      }, delay);
    });
  }
}

// 步骤配置接口
interface TaskStepConfig {
  taskName?: string;
  taskDescription?: string;
  taskType?: TaskType;
  assignee?: string;
  dueDate?: string;
  priority?: TaskPriority;
}

interface DecisionStepConfig {
  conditions: {
    expression: string;
    branch: string;
  }[];
  defaultBranch?: string;
}

interface ScriptStepConfig {
  script: string;
  timeout?: number;
}

interface HttpStepConfig {
  url: string;
  method?: string;
  headers?: Record<string, string>;
  body?: any;
  timeout?: number;
}

interface EmailStepConfig {
  to: string;
  subject: string;
  body: string;
  from?: string;
  cc?: string;
  bcc?: string;
}

interface ApprovalStepConfig {
  approver: string;
  approvalTitle?: string;
  approvalDescription?: string;
  approvalOptions?: string[];
  approvalForm?: any;
  dueDate?: string;
  priority?: TaskPriority;
}

interface TimerStepConfig {
  delay: number;
}

12.4 工作流调度器

12.4.1 调度器接口和实现

// 工作流调度器接口
interface WorkflowScheduler {
  schedule(trigger: TriggerDefinition, workflowId: string): Promise<string>;
  unschedule(scheduleId: string): Promise<void>;
  getSchedules(workflowId?: string): Promise<Schedule[]>;
  start(): Promise<void>;
  stop(): Promise<void>;
}

// 调度任务
interface Schedule {
  id: string;
  workflowId: string;
  trigger: TriggerDefinition;
  nextRunTime: Date;
  lastRunTime?: Date;
  status: ScheduleStatus;
  createdAt: Date;
  updatedAt: Date;
}

// 调度状态
enum ScheduleStatus {
  ACTIVE = 'active',
  PAUSED = 'paused',
  DISABLED = 'disabled',
  EXPIRED = 'expired'
}

// 工作流调度器实现
class LowCodeWorkflowScheduler implements WorkflowScheduler {
  private schedules: Map<string, Schedule> = new Map();
  private timers: Map<string, NodeJS.Timeout> = new Map();
  private eventBus: EventBus;
  private isRunning = false;
  
  constructor(eventBus: EventBus) {
    this.eventBus = eventBus;
  }
  
  // 调度工作流
  async schedule(trigger: TriggerDefinition, workflowId: string): Promise<string> {
    const schedule: Schedule = {
      id: this.generateId(),
      workflowId,
      trigger,
      nextRunTime: this.calculateNextRunTime(trigger),
      status: ScheduleStatus.ACTIVE,
      createdAt: new Date(),
      updatedAt: new Date()
    };
    
    this.schedules.set(schedule.id, schedule);
    
    if (this.isRunning) {
      this.scheduleNext(schedule);
    }
    
    return schedule.id;
  }
  
  // 取消调度
  async unschedule(scheduleId: string): Promise<void> {
    const timer = this.timers.get(scheduleId);
    if (timer) {
      clearTimeout(timer);
      this.timers.delete(scheduleId);
    }
    
    this.schedules.delete(scheduleId);
  }
  
  // 获取调度列表
  async getSchedules(workflowId?: string): Promise<Schedule[]> {
    let schedules = Array.from(this.schedules.values());
    
    if (workflowId) {
      schedules = schedules.filter(s => s.workflowId === workflowId);
    }
    
    return schedules;
  }
  
  // 启动调度器
  async start(): Promise<void> {
    if (this.isRunning) {
      return;
    }
    
    this.isRunning = true;
    
    // 为所有活跃的调度任务设置定时器
    for (const schedule of this.schedules.values()) {
      if (schedule.status === ScheduleStatus.ACTIVE) {
        this.scheduleNext(schedule);
      }
    }
  }
  
  // 停止调度器
  async stop(): Promise<void> {
    this.isRunning = false;
    
    // 清除所有定时器
    for (const timer of this.timers.values()) {
      clearTimeout(timer);
    }
    this.timers.clear();
  }
  
  // 调度下次执行
  private scheduleNext(schedule: Schedule): void {
    const now = new Date();
    const delay = schedule.nextRunTime.getTime() - now.getTime();
    
    if (delay <= 0) {
      // 立即执行
      this.executeSchedule(schedule);
    } else {
      // 设置定时器
      const timer = setTimeout(() => {
        this.executeSchedule(schedule);
      }, delay);
      
      this.timers.set(schedule.id, timer);
    }
  }
  
  // 执行调度任务
  private async executeSchedule(schedule: Schedule): Promise<void> {
    try {
      // 触发工作流启动事件
      this.eventBus.emit('workflow.trigger.fired', {
        scheduleId: schedule.id,
        workflowId: schedule.workflowId,
        trigger: schedule.trigger
      });
      
      // 更新调度信息
      schedule.lastRunTime = new Date();
      schedule.nextRunTime = this.calculateNextRunTime(schedule.trigger);
      schedule.updatedAt = new Date();
      
      // 如果是一次性触发器,禁用调度
      if (schedule.trigger.type === TriggerType.MANUAL) {
        schedule.status = ScheduleStatus.DISABLED;
      } else {
        // 调度下次执行
        this.scheduleNext(schedule);
      }
    } catch (error) {
      console.error('调度执行失败:', error);
    }
  }
  
  // 计算下次运行时间
  private calculateNextRunTime(trigger: TriggerDefinition): Date {
    const now = new Date();
    
    switch (trigger.type) {
      case TriggerType.SCHEDULE:
        return this.calculateScheduleTime(trigger.config as ScheduleTriggerConfig, now);
        
      case TriggerType.MANUAL:
        return now;
        
      default:
        return new Date(now.getTime() + 24 * 60 * 60 * 1000); // 默认24小时后
    }
  }
  
  // 计算调度时间
  private calculateScheduleTime(config: ScheduleTriggerConfig, from: Date): Date {
    if (config.cron) {
      // 使用cron表达式计算下次执行时间
      return this.parseCron(config.cron, from);
    }
    
    if (config.interval) {
      // 使用间隔时间
      return new Date(from.getTime() + config.interval);
    }
    
    if (config.at) {
      // 指定时间执行
      const targetTime = new Date(config.at);
      return targetTime > from ? targetTime : new Date(from.getTime() + 24 * 60 * 60 * 1000);
    }
    
    return new Date(from.getTime() + 60 * 60 * 1000); // 默认1小时后
  }
  
  // 简单的cron解析器(实际项目中应使用专业的cron库)
  private parseCron(cron: string, from: Date): Date {
    // 这里只是一个简化的实现
    // 实际项目中应该使用 node-cron 或类似的库
    const parts = cron.split(' ');
    if (parts.length !== 5) {
      throw new Error('无效的cron表达式');
    }
    
    // 简单处理:如果是 "0 * * * *"(每小时执行),返回下一个整点
    if (cron === '0 * * * *') {
      const next = new Date(from);
      next.setMinutes(0, 0, 0);
      next.setHours(next.getHours() + 1);
      return next;
    }
    
    // 其他情况返回1小时后
    return new Date(from.getTime() + 60 * 60 * 1000);
  }
  
  private generateId(): string {
    return 'schedule_' + Math.random().toString(36).substr(2, 9);
  }
}

// 调度触发器配置
interface ScheduleTriggerConfig {
  cron?: string;
  interval?: number;
  at?: string;
  timezone?: string;
}

12.5 事件总线系统

12.5.1 事件总线实现

// 事件总线接口
interface EventBus {
  on(event: string, handler: EventHandler): void;
  off(event: string, handler: EventHandler): void;
  emit(event: string, data: any): void;
  once(event: string, handler: EventHandler): void;
  removeAllListeners(event?: string): void;
  getListeners(event: string): EventHandler[];
}

// 事件处理器
type EventHandler = (data: any) => void | Promise<void>;

// 事件总线实现
class LowCodeEventBus implements EventBus {
  private listeners: Map<string, Set<EventHandler>> = new Map();
  private onceListeners: Map<string, Set<EventHandler>> = new Map();
  
  // 注册事件监听器
  on(event: string, handler: EventHandler): void {
    if (!this.listeners.has(event)) {
      this.listeners.set(event, new Set());
    }
    this.listeners.get(event)!.add(handler);
  }
  
  // 注销事件监听器
  off(event: string, handler: EventHandler): void {
    const handlers = this.listeners.get(event);
    if (handlers) {
      handlers.delete(handler);
      if (handlers.size === 0) {
        this.listeners.delete(event);
      }
    }
    
    const onceHandlers = this.onceListeners.get(event);
    if (onceHandlers) {
      onceHandlers.delete(handler);
      if (onceHandlers.size === 0) {
        this.onceListeners.delete(event);
      }
    }
  }
  
  // 发送事件
  emit(event: string, data: any): void {
    // 处理普通监听器
    const handlers = this.listeners.get(event);
    if (handlers) {
      for (const handler of handlers) {
        try {
          const result = handler(data);
          if (result instanceof Promise) {
            result.catch(error => {
              console.error(`事件处理器执行失败 [${event}]:`, error);
            });
          }
        } catch (error) {
          console.error(`事件处理器执行失败 [${event}]:`, error);
        }
      }
    }
    
    // 处理一次性监听器
    const onceHandlers = this.onceListeners.get(event);
    if (onceHandlers) {
      const handlersToExecute = Array.from(onceHandlers);
      this.onceListeners.delete(event);
      
      for (const handler of handlersToExecute) {
        try {
          const result = handler(data);
          if (result instanceof Promise) {
            result.catch(error => {
              console.error(`一次性事件处理器执行失败 [${event}]:`, error);
            });
          }
        } catch (error) {
          console.error(`一次性事件处理器执行失败 [${event}]:`, error);
        }
      }
    }
  }
  
  // 注册一次性事件监听器
  once(event: string, handler: EventHandler): void {
    if (!this.onceListeners.has(event)) {
      this.onceListeners.set(event, new Set());
    }
    this.onceListeners.get(event)!.add(handler);
  }
  
  // 移除所有监听器
  removeAllListeners(event?: string): void {
    if (event) {
      this.listeners.delete(event);
      this.onceListeners.delete(event);
    } else {
      this.listeners.clear();
      this.onceListeners.clear();
    }
  }
  
  // 获取监听器列表
  getListeners(event: string): EventHandler[] {
    const handlers = this.listeners.get(event);
    const onceHandlers = this.onceListeners.get(event);
    
    const result: EventHandler[] = [];
    if (handlers) {
      result.push(...Array.from(handlers));
    }
    if (onceHandlers) {
      result.push(...Array.from(onceHandlers));
    }
    
    return result;
  }
}

12.6 工作流存储系统

12.6.1 存储接口和实现

// 工作流存储接口
interface WorkflowStorage {
  // 工作流定义存储
  saveWorkflow(workflow: Workflow): Promise<void>;
  loadWorkflow(id: string): Promise<Workflow | null>;
  deleteWorkflow(id: string): Promise<void>;
  listWorkflows(query?: WorkflowQuery): Promise<Workflow[]>;
  
  // 工作流实例存储
  saveInstance(instance: WorkflowInstance): Promise<void>;
  loadInstance(id: string): Promise<WorkflowInstance | null>;
  deleteInstance(id: string): Promise<void>;
  listInstances(query?: InstanceQuery): Promise<WorkflowInstance[]>;
  
  // 任务存储
  saveTask(task: Task): Promise<void>;
  loadTask(id: string): Promise<Task | null>;
  deleteTask(id: string): Promise<void>;
  listTasks(query?: TaskQuery): Promise<Task[]>;
  
  // 事件存储
  saveEvent(event: WorkflowEvent): Promise<void>;
  loadEvents(instanceId: string): Promise<WorkflowEvent[]>;
  
  // 调度存储
  saveSchedule(schedule: Schedule): Promise<void>;
  loadSchedule(id: string): Promise<Schedule | null>;
  deleteSchedule(id: string): Promise<void>;
  listSchedules(workflowId?: string): Promise<Schedule[]>;
}

// 内存存储实现(用于开发和测试)
class MemoryWorkflowStorage implements WorkflowStorage {
  private workflows: Map<string, Workflow> = new Map();
  private instances: Map<string, WorkflowInstance> = new Map();
  private tasks: Map<string, Task> = new Map();
  private events: Map<string, WorkflowEvent[]> = new Map();
  private schedules: Map<string, Schedule> = new Map();
  
  // 工作流定义存储
  async saveWorkflow(workflow: Workflow): Promise<void> {
    this.workflows.set(workflow.id, { ...workflow });
  }
  
  async loadWorkflow(id: string): Promise<Workflow | null> {
    const workflow = this.workflows.get(id);
    return workflow ? { ...workflow } : null;
  }
  
  async deleteWorkflow(id: string): Promise<void> {
    this.workflows.delete(id);
  }
  
  async listWorkflows(query: WorkflowQuery = {}): Promise<Workflow[]> {
    let workflows = Array.from(this.workflows.values());
    
    // 应用过滤器
    if (query.status) {
      workflows = workflows.filter(w => w.status === query.status);
    }
    
    if (query.search) {
      const searchLower = query.search.toLowerCase();
      workflows = workflows.filter(w => 
        w.name.toLowerCase().includes(searchLower) ||
        (w.description && w.description.toLowerCase().includes(searchLower))
      );
    }
    
    if (query.tags && query.tags.length > 0) {
      workflows = workflows.filter(w => 
        w.tags && query.tags!.some(tag => w.tags!.includes(tag))
      );
    }
    
    // 应用排序
    if (query.sortBy) {
      workflows.sort((a, b) => {
        const aValue = (a as any)[query.sortBy!];
        const bValue = (b as any)[query.sortBy!];
        
        if (query.sortOrder === 'desc') {
          return bValue > aValue ? 1 : -1;
        } else {
          return aValue > bValue ? 1 : -1;
        }
      });
    }
    
    // 应用分页
    if (query.limit) {
      const offset = query.offset || 0;
      workflows = workflows.slice(offset, offset + query.limit);
    }
    
    return workflows.map(w => ({ ...w }));
  }
  
  // 工作流实例存储
  async saveInstance(instance: WorkflowInstance): Promise<void> {
    this.instances.set(instance.id, { 
      ...instance,
      steps: new Map(instance.steps)
    });
  }
  
  async loadInstance(id: string): Promise<WorkflowInstance | null> {
    const instance = this.instances.get(id);
    return instance ? { 
      ...instance,
      steps: new Map(instance.steps)
    } : null;
  }
  
  async deleteInstance(id: string): Promise<void> {
    this.instances.delete(id);
  }
  
  async listInstances(query: InstanceQuery = {}): Promise<WorkflowInstance[]> {
    let instances = Array.from(this.instances.values());
    
    // 应用过滤器
    if (query.workflowId) {
      instances = instances.filter(i => i.workflowId === query.workflowId);
    }
    
    if (query.status) {
      instances = instances.filter(i => i.status === query.status);
    }
    
    if (query.startedBy) {
      instances = instances.filter(i => i.context.startedBy === query.startedBy);
    }
    
    if (query.dateRange) {
      instances = instances.filter(i => 
        i.createdAt >= query.dateRange!.start && 
        i.createdAt <= query.dateRange!.end
      );
    }
    
    // 应用排序
    if (query.sortBy) {
      instances.sort((a, b) => {
        const aValue = (a as any)[query.sortBy!];
        const bValue = (b as any)[query.sortBy!];
        
        if (query.sortOrder === 'desc') {
          return bValue > aValue ? 1 : -1;
        } else {
          return aValue > bValue ? 1 : -1;
        }
      });
    }
    
    // 应用分页
    if (query.limit) {
      const offset = query.offset || 0;
      instances = instances.slice(offset, offset + query.limit);
    }
    
    return instances.map(i => ({ 
      ...i,
      steps: new Map(i.steps)
    }));
  }
  
  // 任务存储
  async saveTask(task: Task): Promise<void> {
    this.tasks.set(task.id, { ...task });
  }
  
  async loadTask(id: string): Promise<Task | null> {
    const task = this.tasks.get(id);
    return task ? { ...task } : null;
  }
  
  async deleteTask(id: string): Promise<void> {
    this.tasks.delete(id);
  }
  
  async listTasks(query: TaskQuery = {}): Promise<Task[]> {
    let tasks = Array.from(this.tasks.values());
    
    // 应用过滤器
    if (query.assignee) {
      tasks = tasks.filter(t => t.assignee === query.assignee);
    }
    
    if (query.status) {
      tasks = tasks.filter(t => t.status === query.status);
    }
    
    if (query.type) {
      tasks = tasks.filter(t => t.type === query.type);
    }
    
    if (query.priority) {
      tasks = tasks.filter(t => t.priority === query.priority);
    }
    
    if (query.dueDate) {
      tasks = tasks.filter(t => t.dueDate && t.dueDate <= query.dueDate!);
    }
    
    // 应用排序
    if (query.sortBy) {
      tasks.sort((a, b) => {
        const aValue = (a as any)[query.sortBy!];
        const bValue = (b as any)[query.sortBy!];
        
        if (query.sortOrder === 'desc') {
          return bValue > aValue ? 1 : -1;
        } else {
          return aValue > bValue ? 1 : -1;
        }
      });
    }
    
    // 应用分页
    if (query.limit) {
      const offset = query.offset || 0;
      tasks = tasks.slice(offset, offset + query.limit);
    }
    
    return tasks.map(t => ({ ...t }));
  }
  
  // 事件存储
  async saveEvent(event: WorkflowEvent): Promise<void> {
    if (!this.events.has(event.instanceId)) {
      this.events.set(event.instanceId, []);
    }
    this.events.get(event.instanceId)!.push({ ...event });
  }
  
  async loadEvents(instanceId: string): Promise<WorkflowEvent[]> {
    const events = this.events.get(instanceId);
    return events ? events.map(e => ({ ...e })) : [];
  }
  
  // 调度存储
  async saveSchedule(schedule: Schedule): Promise<void> {
    this.schedules.set(schedule.id, { ...schedule });
  }
  
  async loadSchedule(id: string): Promise<Schedule | null> {
    const schedule = this.schedules.get(id);
    return schedule ? { ...schedule } : null;
  }
  
  async deleteSchedule(id: string): Promise<void> {
    this.schedules.delete(id);
  }
  
  async listSchedules(workflowId?: string): Promise<Schedule[]> {
    let schedules = Array.from(this.schedules.values());
    
    if (workflowId) {
      schedules = schedules.filter(s => s.workflowId === workflowId);
    }
    
    return schedules.map(s => ({ ...s }));
  }
}

12.7 使用示例

12.7.1 完整的工作流引擎示例

// 工作流引擎与自动化示例
class LowCodeWorkflowDemo {
  private workflowEngine: LowCodeWorkflowEngine;
  private eventBus: LowCodeEventBus;
  private storage: MemoryWorkflowStorage;
  private scheduler: LowCodeWorkflowScheduler;
  
  constructor() {
    this.eventBus = new LowCodeEventBus();
    this.storage = new MemoryWorkflowStorage();
    this.scheduler = new LowCodeWorkflowScheduler(this.eventBus);
    
    this.workflowEngine = new LowCodeWorkflowEngine({
      eventBus: this.eventBus,
      auditLogger: new ConsoleAuditLogger(),
      scheduler: this.scheduler,
      storage: this.storage
    });
    
    this.setupEventHandlers();
  }
  
  // 设置事件处理器
  private setupEventHandlers(): void {
    // 监听工作流事件
    this.eventBus.on('workflow.created', (data) => {
      console.log('工作流已创建:', data.workflow.name);
    });
    
    this.eventBus.on('workflow.instance.started', (data) => {
      console.log('工作流实例已启动:', data.instanceId);
    });
    
    this.eventBus.on('workflow.instance.completed', (data) => {
      console.log('工作流实例已完成:', data.instanceId);
    });
    
    this.eventBus.on('workflow.step.completed', (data) => {
      console.log('步骤已完成:', data.stepId, '输出:', data.output);
    });
    
    this.eventBus.on('workflow.trigger.fired', async (data) => {
      console.log('触发器触发:', data.scheduleId);
      // 启动工作流实例
      await this.workflowEngine.startWorkflow(data.workflowId, {
        triggeredBy: 'scheduler',
        scheduleId: data.scheduleId
      });
    });
  }
  
  // 演示基本工作流创建和执行
  async demonstrateBasicWorkflow(): Promise<void> {
    console.log('\n=== 基本工作流演示 ===');
    
    // 创建简单的审批工作流
    const workflowDefinition: WorkflowDefinition = {
      name: '请假申请审批流程',
      description: '员工请假申请的审批工作流',
      steps: [
        {
          id: 'start',
          name: '开始',
          type: StepType.START,
          config: {},
          position: { x: 100, y: 100 }
        },
        {
          id: 'validate_request',
          name: '验证申请',
          type: StepType.SCRIPT,
          config: {
            script: `
              const { leaveType, startDate, endDate, reason } = context.input;
              
              // 验证必填字段
              if (!leaveType || !startDate || !endDate || !reason) {
                throw new Error('请填写完整的请假信息');
              }
              
              // 计算请假天数
              const start = new Date(startDate);
              const end = new Date(endDate);
              const days = Math.ceil((end.getTime() - start.getTime()) / (1000 * 60 * 60 * 24));
              
              return {
                validated: true,
                leaveDays: days,
                validatedAt: new Date().toISOString()
              };
            `
          },
          position: { x: 100, y: 200 }
        },
        {
          id: 'manager_approval',
          name: '经理审批',
          type: StepType.APPROVAL,
          config: {
            approver: 'manager@company.com',
            approvalTitle: '请假申请审批',
            approvalDescription: '请审批员工的请假申请',
            approvalOptions: ['approve', 'reject', 'request_more_info'],
            priority: TaskPriority.NORMAL
          },
          position: { x: 100, y: 300 }
        },
        {
          id: 'hr_approval',
          name: 'HR审批',
          type: StepType.APPROVAL,
          config: {
            approver: 'hr@company.com',
            approvalTitle: 'HR最终审批',
            approvalDescription: '请进行HR最终审批',
            approvalOptions: ['approve', 'reject'],
            priority: TaskPriority.HIGH
          },
          condition: '${variables.leaveDays > 3}',
          position: { x: 100, y: 400 }
        },
        {
          id: 'send_notification',
          name: '发送通知',
          type: StepType.EMAIL,
          config: {
            to: '${input.applicant}',
            subject: '请假申请结果通知',
            body: `
              您的请假申请已处理完成。
              
              申请类型:${input.leaveType}
              请假时间:${input.startDate} 至 ${input.endDate}
              审批结果:已批准
              
              请注意安排好工作交接。
            `
          },
          position: { x: 100, y: 500 }
        },
        {
          id: 'end',
          name: '结束',
          type: StepType.END,
          config: {},
          position: { x: 100, y: 600 }
        }
      ],
      connections: [
        { id: 'c1', from: 'start', to: 'validate_request' },
        { id: 'c2', from: 'validate_request', to: 'manager_approval' },
        { id: 'c3', from: 'manager_approval', to: 'hr_approval', condition: '${variables.leaveDays > 3}' },
        { id: 'c4', from: 'manager_approval', to: 'send_notification', condition: '${variables.leaveDays <= 3}' },
        { id: 'c5', from: 'hr_approval', to: 'send_notification' },
        { id: 'c6', from: 'send_notification', to: 'end' }
      ],
      variables: [
        { name: 'leaveDays', type: VariableType.NUMBER, description: '请假天数' },
        { name: 'approvalResult', type: VariableType.STRING, description: '审批结果' }
      ]
    };
    
    // 创建工作流
    const workflow = await this.workflowEngine.createWorkflow(workflowDefinition);
    console.log('工作流已创建:', workflow.id);
    
    // 发布工作流
    workflow.status = WorkflowStatus.PUBLISHED;
    await this.workflowEngine.updateWorkflow(workflow.id, workflowDefinition);
    
    // 启动工作流实例
    const instance = await this.workflowEngine.startWorkflow(workflow.id, {
      applicant: 'employee@company.com',
      leaveType: '年假',
      startDate: '2024-02-01',
      endDate: '2024-02-05',
      reason: '春节回家探亲'
    }, {
      userId: 'employee123',
      tenantId: 'company1'
    });
    
    console.log('工作流实例已启动:', instance.id);
    
    // 模拟经理审批
    setTimeout(async () => {
      const tasks = await this.workflowEngine.getTasks({
        assignee: 'manager@company.com',
        status: TaskStatus.ASSIGNED
      });
      
      if (tasks.length > 0) {
        const task = tasks[0];
        console.log('经理正在审批任务:', task.id);
        
        await this.workflowEngine.completeTask(task.id, {
          decision: 'approve',
          comment: '同意请假申请',
          approvedAt: new Date().toISOString()
        });
        
        console.log('经理审批完成');
      }
    }, 2000);
    
    // 模拟HR审批
    setTimeout(async () => {
      const tasks = await this.workflowEngine.getTasks({
        assignee: 'hr@company.com',
        status: TaskStatus.ASSIGNED
      });
      
      if (tasks.length > 0) {
        const task = tasks[0];
        console.log('HR正在审批任务:', task.id);
        
        await this.workflowEngine.completeTask(task.id, {
          decision: 'approve',
          comment: 'HR最终批准',
          approvedAt: new Date().toISOString()
        });
        
        console.log('HR审批完成');
      }
    }, 4000);
  }
  
  // 演示自动化工作流
  async demonstrateAutomatedWorkflow(): Promise<void> {
    console.log('\n=== 自动化工作流演示 ===');
    
    // 创建数据处理自动化工作流
    const workflowDefinition: WorkflowDefinition = {
      name: '数据处理自动化流程',
      description: '自动处理和分析数据的工作流',
      steps: [
        {
          id: 'start',
          name: '开始',
          type: StepType.START,
          config: {}
        },
        {
          id: 'fetch_data',
          name: '获取数据',
          type: StepType.HTTP,
          config: {
            url: 'https://api.example.com/data',
            method: 'GET',
            headers: {
              'Authorization': 'Bearer ${variables.apiToken}',
              'Content-Type': 'application/json'
            },
            timeout: 30000
          }
        },
        {
          id: 'process_data',
          name: '处理数据',
          type: StepType.SCRIPT,
          config: {
            script: `
              const { data } = context.input;
              
              // 数据处理逻辑
              const processedData = data.map(item => ({
                id: item.id,
                name: item.name,
                value: item.value * 1.1, // 增加10%
                processedAt: new Date().toISOString()
              }));
              
              // 计算统计信息
              const totalValue = processedData.reduce((sum, item) => sum + item.value, 0);
              const averageValue = totalValue / processedData.length;
              
              return {
                processedData,
                statistics: {
                  total: processedData.length,
                  totalValue,
                  averageValue
                }
              };
            `
          }
        },
        {
          id: 'save_results',
          name: '保存结果',
          type: StepType.HTTP,
          config: {
            url: 'https://api.example.com/results',
            method: 'POST',
            headers: {
              'Authorization': 'Bearer ${variables.apiToken}',
              'Content-Type': 'application/json'
            },
            body: {
              data: '${variables.processedData}',
              statistics: '${variables.statistics}',
              timestamp: '${new Date().toISOString()}'
            }
          }
        },
        {
          id: 'send_report',
          name: '发送报告',
          type: StepType.EMAIL,
          config: {
            to: 'admin@company.com',
            subject: '数据处理完成报告',
            body: `
              数据处理已完成。
              
              处理统计:
              - 处理记录数:${variables.statistics.total}
              - 总值:${variables.statistics.totalValue}
              - 平均值:${variables.statistics.averageValue}
              
              处理时间:${new Date().toISOString()}
            `
          }
        },
        {
          id: 'end',
          name: '结束',
          type: StepType.END,
          config: {}
        }
      ],
      connections: [
        { id: 'c1', from: 'start', to: 'fetch_data' },
        { id: 'c2', from: 'fetch_data', to: 'process_data' },
        { id: 'c3', from: 'process_data', to: 'save_results' },
        { id: 'c4', from: 'save_results', to: 'send_report' },
        { id: 'c5', from: 'send_report', to: 'end' }
      ],
      triggers: [
        {
          id: 'daily_trigger',
          type: TriggerType.SCHEDULE,
          config: {
            cron: '0 2 * * *', // 每天凌晨2点执行
            timezone: 'Asia/Shanghai'
          },
          enabled: true
        }
      ]
    };
    
    // 创建工作流
    const workflow = await this.workflowEngine.createWorkflow(workflowDefinition);
    console.log('自动化工作流已创建:', workflow.id);
    
    // 发布工作流
    workflow.status = WorkflowStatus.PUBLISHED;
    await this.workflowEngine.updateWorkflow(workflow.id, workflowDefinition);
    
    // 设置调度
    if (workflow.definition.triggers) {
      for (const trigger of workflow.definition.triggers) {
        if (trigger.enabled) {
          const scheduleId = await this.scheduler.schedule(trigger, workflow.id);
          console.log('调度已设置:', scheduleId);
        }
      }
    }
    
    // 手动启动一次进行测试
    const instance = await this.workflowEngine.startWorkflow(workflow.id, {
      apiToken: 'test-token-123'
    });
    
    console.log('自动化工作流实例已启动:', instance.id);
  }
  
  // 演示并行工作流
  async demonstrateParallelWorkflow(): Promise<void> {
    console.log('\n=== 并行工作流演示 ===');
    
    // 创建并行处理工作流
    const workflowDefinition: WorkflowDefinition = {
      name: '并行处理工作流',
      description: '同时执行多个任务的并行工作流',
      steps: [
        {
          id: 'start',
          name: '开始',
          type: StepType.START,
          config: {}
        },
        {
          id: 'prepare_data',
          name: '准备数据',
          type: StepType.SCRIPT,
          config: {
            script: `
              return {
                dataset1: { name: 'Dataset 1', size: 1000 },
                dataset2: { name: 'Dataset 2', size: 2000 },
                dataset3: { name: 'Dataset 3', size: 1500 }
              };
            `
          }
        },
        {
          id: 'process_dataset1',
          name: '处理数据集1',
          type: StepType.SCRIPT,
          config: {
            script: `
              const { dataset1 } = context.variables;
              console.log('处理数据集1:', dataset1.name);
              
              // 模拟处理时间
              await new Promise(resolve => setTimeout(resolve, 1000));
              
              return {
                result1: {
                  name: dataset1.name,
                  processed: true,
                  processedSize: dataset1.size,
                  processedAt: new Date().toISOString()
                }
              };
            `
          }
        },
        {
          id: 'process_dataset2',
          name: '处理数据集2',
          type: StepType.SCRIPT,
          config: {
            script: `
              const { dataset2 } = context.variables;
              console.log('处理数据集2:', dataset2.name);
              
              // 模拟处理时间
              await new Promise(resolve => setTimeout(resolve, 1500));
              
              return {
                result2: {
                  name: dataset2.name,
                  processed: true,
                  processedSize: dataset2.size,
                  processedAt: new Date().toISOString()
                }
              };
            `
          }
        },
        {
          id: 'process_dataset3',
          name: '处理数据集3',
          type: StepType.SCRIPT,
          config: {
            script: `
              const { dataset3 } = context.variables;
              console.log('处理数据集3:', dataset3.name);
              
              // 模拟处理时间
              await new Promise(resolve => setTimeout(resolve, 800));
              
              return {
                result3: {
                  name: dataset3.name,
                  processed: true,
                  processedSize: dataset3.size,
                  processedAt: new Date().toISOString()
                }
              };
            `
          }
        },
        {
          id: 'merge_results',
          name: '合并结果',
          type: StepType.SCRIPT,
          config: {
            script: `
              const { result1, result2, result3 } = context.variables;
              
              const mergedResults = {
                totalDatasets: 3,
                totalSize: result1.processedSize + result2.processedSize + result3.processedSize,
                results: [result1, result2, result3],
                mergedAt: new Date().toISOString()
              };
              
              console.log('合并结果完成:', mergedResults);
              
              return { mergedResults };
            `
          }
        },
        {
          id: 'end',
          name: '结束',
          type: StepType.END,
          config: {}
        }
      ],
      connections: [
        { id: 'c1', from: 'start', to: 'prepare_data' },
        { id: 'c2', from: 'prepare_data', to: 'process_dataset1' },
        { id: 'c3', from: 'prepare_data', to: 'process_dataset2' },
        { id: 'c4', from: 'prepare_data', to: 'process_dataset3' },
        { id: 'c5', from: 'process_dataset1', to: 'merge_results' },
        { id: 'c6', from: 'process_dataset2', to: 'merge_results' },
        { id: 'c7', from: 'process_dataset3', to: 'merge_results' },
        { id: 'c8', from: 'merge_results', to: 'end' }
      ]
    };
    
    // 创建工作流
    const workflow = await this.workflowEngine.createWorkflow(workflowDefinition);
    console.log('并行工作流已创建:', workflow.id);
    
    // 发布工作流
    workflow.status = WorkflowStatus.PUBLISHED;
    await this.workflowEngine.updateWorkflow(workflow.id, workflowDefinition);
    
    // 启动工作流实例
    const instance = await this.workflowEngine.startWorkflow(workflow.id, {});
    console.log('并行工作流实例已启动:', instance.id);
  }
  
  // 演示工作流监控
  async demonstrateWorkflowMonitoring(): Promise<void> {
    console.log('\n=== 工作流监控演示 ===');
    
    // 获取所有工作流
    const workflows = await this.workflowEngine.getWorkflows();
    console.log('工作流列表:');
    workflows.forEach(workflow => {
      console.log(`- ${workflow.name} (${workflow.id}) - ${workflow.status}`);
    });
    
    // 获取所有实例
    const instances = await this.workflowEngine.getInstances();
    console.log('\n工作流实例列表:');
    instances.forEach(instance => {
      console.log(`- ${instance.id} - ${instance.status} - 当前步骤: ${instance.currentStep}`);
    });
    
    // 获取所有任务
    const tasks = await this.workflowEngine.getTasks();
    console.log('\n任务列表:');
    tasks.forEach(task => {
      console.log(`- ${task.name} (${task.id}) - ${task.status} - 受理人: ${task.assignee}`);
    });
    
    // 获取调度列表
    const schedules = await this.scheduler.getSchedules();
    console.log('\n调度列表:');
    schedules.forEach(schedule => {
      console.log(`- ${schedule.id} - ${schedule.status} - 下次执行: ${schedule.nextRunTime}`);
    });
  }
  
  // 运行所有演示
  async runDemo(): Promise<void> {
    console.log('开始工作流引擎与自动化演示...');
    
    // 启动调度器
    await this.scheduler.start();
    
    try {
      await this.demonstrateBasicWorkflow();
      await new Promise(resolve => setTimeout(resolve, 1000));
      
      await this.demonstrateAutomatedWorkflow();
      await new Promise(resolve => setTimeout(resolve, 1000));
      
      await this.demonstrateParallelWorkflow();
      await new Promise(resolve => setTimeout(resolve, 3000));
      
      await this.demonstrateWorkflowMonitoring();
    } finally {
      // 停止调度器
      await this.scheduler.stop();
    }
    
    console.log('\n工作流引擎与自动化演示完成!');
  }
}

// 简单的审计日志实现
class ConsoleAuditLogger implements AuditLogger {
  async log(entry: AuditLogEntry): Promise<void> {
    console.log(`[AUDIT] ${entry.action} - ${entry.resourceType}:${entry.resourceId}`, entry.details);
  }
}

interface AuditLogger {
  log(entry: AuditLogEntry): Promise<void>;
}

interface AuditLogEntry {
  action: string;
  resourceType: string;
  resourceId: string;
  details?: any;
  userId?: string;
  timestamp?: Date;
}

// 运行演示
const demo = new LowCodeWorkflowDemo();
demo.runDemo().catch(console.error);

12.7.2 Web API 集成示例

// Express.js 集成示例
import express from 'express';
import { LowCodeWorkflowEngine } from './workflow-engine';

const app = express();
app.use(express.json());

// 初始化工作流引擎
const workflowEngine = new LowCodeWorkflowEngine({
  eventBus: new LowCodeEventBus(),
  auditLogger: new ConsoleAuditLogger()
});

// 工作流管理 API
app.post('/api/workflows', async (req, res) => {
  try {
    const workflow = await workflowEngine.createWorkflow(req.body);
    res.json(workflow);
  } catch (error) {
    res.status(400).json({ error: error.message });
  }
});

app.get('/api/workflows', async (req, res) => {
  try {
    const workflows = await workflowEngine.getWorkflows(req.query);
    res.json(workflows);
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
});

app.get('/api/workflows/:id', async (req, res) => {
  try {
    const workflow = await workflowEngine.getWorkflow(req.params.id);
    if (!workflow) {
      return res.status(404).json({ error: '工作流不存在' });
    }
    res.json(workflow);
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
});

app.put('/api/workflows/:id', async (req, res) => {
  try {
    const workflow = await workflowEngine.updateWorkflow(req.params.id, req.body);
    res.json(workflow);
  } catch (error) {
    res.status(400).json({ error: error.message });
  }
});

app.delete('/api/workflows/:id', async (req, res) => {
  try {
    await workflowEngine.deleteWorkflow(req.params.id);
    res.status(204).send();
  } catch (error) {
    res.status(400).json({ error: error.message });
  }
});

// 工作流实例管理 API
app.post('/api/workflows/:id/instances', async (req, res) => {
  try {
    const instance = await workflowEngine.startWorkflow(
      req.params.id,
      req.body.input,
      req.body.context
    );
    res.json(instance);
  } catch (error) {
    res.status(400).json({ error: error.message });
  }
});

app.get('/api/instances', async (req, res) => {
  try {
    const instances = await workflowEngine.getInstances(req.query);
    res.json(instances);
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
});

app.get('/api/instances/:id', async (req, res) => {
  try {
    const instance = await workflowEngine.getInstance(req.params.id);
    if (!instance) {
      return res.status(404).json({ error: '工作流实例不存在' });
    }
    res.json(instance);
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
});

app.post('/api/instances/:id/pause', async (req, res) => {
  try {
    await workflowEngine.pauseWorkflow(req.params.id);
    res.status(204).send();
  } catch (error) {
    res.status(400).json({ error: error.message });
  }
});

app.post('/api/instances/:id/resume', async (req, res) => {
  try {
    await workflowEngine.resumeWorkflow(req.params.id);
    res.status(204).send();
  } catch (error) {
    res.status(400).json({ error: error.message });
  }
});

app.post('/api/instances/:id/cancel', async (req, res) => {
  try {
    await workflowEngine.cancelWorkflow(req.params.id, req.body.reason);
    res.status(204).send();
  } catch (error) {
    res.status(400).json({ error: error.message });
  }
});

// 任务管理 API
app.get('/api/tasks', async (req, res) => {
  try {
    const tasks = await workflowEngine.getTasks(req.query);
    res.json(tasks);
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
});

app.post('/api/tasks/:id/assign', async (req, res) => {
  try {
    await workflowEngine.assignTask(req.params.id, req.body.assignee);
    res.status(204).send();
  } catch (error) {
    res.status(400).json({ error: error.message });
  }
});

app.post('/api/tasks/:id/complete', async (req, res) => {
  try {
    await workflowEngine.completeTask(req.params.id, req.body.output);
    res.status(204).send();
  } catch (error) {
    res.status(400).json({ error: error.message });
  }
});

app.post('/api/tasks/:id/delegate', async (req, res) => {
  try {
    await workflowEngine.delegateTask(req.params.id, req.body.assignee);
    res.status(204).send();
  } catch (error) {
    res.status(400).json({ error: error.message });
  }
});

// 启动服务器
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`工作流引擎服务器运行在端口 ${PORT}`);
});

12.8 小结

本章详细介绍了低代码平台中工作流引擎与自动化系统的设计和实现。我们学习了以下核心内容:

核心要点

  1. 工作流引擎架构:设计了完整的工作流引擎,包括工作流定义管理、实例执行、任务管理和事件处理

  2. 步骤执行器:实现了可扩展的步骤执行器,支持多种步骤类型,包括脚本、HTTP请求、邮件发送、审批等

  3. 工作流调度器:提供了基于时间的工作流调度功能,支持cron表达式和间隔调度

  4. 事件驱动架构:使用事件总线实现松耦合的组件通信,支持工作流生命周期事件

  5. 存储抽象:设计了存储接口,支持工作流定义、实例、任务和事件的持久化

  6. 监控和审计:提供了完整的工作流监控和审计功能,便于跟踪和调试

技术特色

  1. 类型安全:使用TypeScript提供完整的类型定义,确保编译时类型检查

  2. 可扩展性:支持自定义步骤处理器,可以轻松扩展新的步骤类型

  3. 事件驱动:基于事件的架构设计,支持实时监控和响应

  4. 错误处理:提供了完善的错误处理机制,包括重试、补偿和终止策略

  5. 并行执行:支持并行步骤执行,提高工作流处理效率

  6. 灵活调度:支持多种触发方式,包括手动、定时和事件触发

工作流引擎是低代码平台的重要组成部分,它能够自动化复杂的业务流程,提高工作效率。通过本章的学习,你已经掌握了如何设计和实现一个功能完善的工作流引擎系统。

下一章我们将学习API网关与服务集成,了解如何在低代码平台中集成外部服务和API。 “`