工作流引擎是低代码平台的核心组件之一,它负责管理和执行复杂的业务流程。本章将详细介绍工作流引擎的设计与实现,包括流程定义、节点管理、流程执行、状态管理等核心功能。
8.1 工作流引擎架构概览
8.1.1 整体架构设计
// 工作流引擎接口
interface WorkflowEngine {
// 流程管理
createWorkflow(definition: WorkflowDefinition): Promise<Workflow>;
getWorkflow(id: string): Promise<Workflow | null>;
updateWorkflow(id: string, definition: WorkflowDefinition): Promise<void>;
deleteWorkflow(id: string): Promise<void>;
// 流程实例管理
startWorkflow(workflowId: string, context: WorkflowContext): Promise<WorkflowInstance>;
getWorkflowInstance(instanceId: string): Promise<WorkflowInstance | null>;
pauseWorkflowInstance(instanceId: string): Promise<void>;
resumeWorkflowInstance(instanceId: string): Promise<void>;
terminateWorkflowInstance(instanceId: string): Promise<void>;
// 任务管理
completeTask(taskId: string, result: TaskResult): Promise<void>;
assignTask(taskId: string, assignee: string): Promise<void>;
// 事件处理
on(event: string, handler: Function): void;
off(event: string, handler: Function): void;
emit(event: string, data: any): void;
}
// 低代码工作流引擎实现
class LowCodeWorkflowEngine implements WorkflowEngine {
private workflows: Map<string, Workflow> = new Map();
private instances: Map<string, WorkflowInstance> = new Map();
private tasks: Map<string, Task> = new Map();
private nodeRegistry: NodeRegistry;
private eventBus: EventBus;
private executionEngine: ExecutionEngine;
private stateManager: WorkflowStateManager;
constructor(options: WorkflowEngineOptions) {
this.nodeRegistry = options.nodeRegistry || new DefaultNodeRegistry();
this.eventBus = options.eventBus || new EventBus();
this.executionEngine = new ExecutionEngine(this.eventBus);
this.stateManager = new WorkflowStateManager();
this.initializeEngine();
}
private initializeEngine(): void {
// 注册内置节点类型
this.registerBuiltInNodes();
// 绑定事件监听
this.bindEvents();
}
async createWorkflow(definition: WorkflowDefinition): Promise<Workflow> {
// 验证工作流定义
this.validateWorkflowDefinition(definition);
// 创建工作流实例
const workflow = new Workflow({
id: definition.id || this.generateId(),
name: definition.name,
description: definition.description,
version: definition.version || '1.0.0',
nodes: definition.nodes,
edges: definition.edges,
variables: definition.variables || {},
settings: definition.settings || {}
});
// 保存工作流
this.workflows.set(workflow.id, workflow);
// 触发事件
this.eventBus.emit('workflow:created', { workflow });
return workflow;
}
async getWorkflow(id: string): Promise<Workflow | null> {
return this.workflows.get(id) || null;
}
async updateWorkflow(id: string, definition: WorkflowDefinition): Promise<void> {
const workflow = this.workflows.get(id);
if (!workflow) {
throw new Error(`Workflow with id ${id} not found`);
}
// 验证工作流定义
this.validateWorkflowDefinition(definition);
// 更新工作流
workflow.update(definition);
// 触发事件
this.eventBus.emit('workflow:updated', { workflow });
}
async deleteWorkflow(id: string): Promise<void> {
const workflow = this.workflows.get(id);
if (!workflow) {
throw new Error(`Workflow with id ${id} not found`);
}
// 检查是否有运行中的实例
const runningInstances = Array.from(this.instances.values())
.filter(instance => instance.workflowId === id && instance.status === WorkflowStatus.RUNNING);
if (runningInstances.length > 0) {
throw new Error('Cannot delete workflow with running instances');
}
// 删除工作流
this.workflows.delete(id);
// 触发事件
this.eventBus.emit('workflow:deleted', { workflowId: id });
}
async startWorkflow(workflowId: string, context: WorkflowContext): Promise<WorkflowInstance> {
const workflow = this.workflows.get(workflowId);
if (!workflow) {
throw new Error(`Workflow with id ${workflowId} not found`);
}
// 创建工作流实例
const instance = new WorkflowInstance({
id: this.generateId(),
workflowId: workflowId,
status: WorkflowStatus.RUNNING,
context: context,
startTime: new Date(),
currentNodes: [],
completedNodes: [],
variables: { ...workflow.variables, ...context.variables }
});
// 保存实例
this.instances.set(instance.id, instance);
// 开始执行
await this.executionEngine.startExecution(workflow, instance);
// 触发事件
this.eventBus.emit('workflow:started', { workflow, instance });
return instance;
}
async getWorkflowInstance(instanceId: string): Promise<WorkflowInstance | null> {
return this.instances.get(instanceId) || null;
}
async pauseWorkflowInstance(instanceId: string): Promise<void> {
const instance = this.instances.get(instanceId);
if (!instance) {
throw new Error(`Workflow instance with id ${instanceId} not found`);
}
if (instance.status !== WorkflowStatus.RUNNING) {
throw new Error('Can only pause running workflow instances');
}
// 暂停实例
instance.status = WorkflowStatus.PAUSED;
instance.pauseTime = new Date();
// 暂停执行引擎
await this.executionEngine.pauseExecution(instanceId);
// 触发事件
this.eventBus.emit('workflow:paused', { instance });
}
async resumeWorkflowInstance(instanceId: string): Promise<void> {
const instance = this.instances.get(instanceId);
if (!instance) {
throw new Error(`Workflow instance with id ${instanceId} not found`);
}
if (instance.status !== WorkflowStatus.PAUSED) {
throw new Error('Can only resume paused workflow instances');
}
// 恢复实例
instance.status = WorkflowStatus.RUNNING;
instance.resumeTime = new Date();
// 恢复执行引擎
await this.executionEngine.resumeExecution(instanceId);
// 触发事件
this.eventBus.emit('workflow:resumed', { instance });
}
async terminateWorkflowInstance(instanceId: string): Promise<void> {
const instance = this.instances.get(instanceId);
if (!instance) {
throw new Error(`Workflow instance with id ${instanceId} not found`);
}
// 终止实例
instance.status = WorkflowStatus.TERMINATED;
instance.endTime = new Date();
// 终止执行引擎
await this.executionEngine.terminateExecution(instanceId);
// 触发事件
this.eventBus.emit('workflow:terminated', { instance });
}
async completeTask(taskId: string, result: TaskResult): Promise<void> {
const task = this.tasks.get(taskId);
if (!task) {
throw new Error(`Task with id ${taskId} not found`);
}
// 完成任务
task.status = TaskStatus.COMPLETED;
task.result = result;
task.completedTime = new Date();
// 继续执行工作流
await this.executionEngine.continueExecution(task.instanceId, task.nodeId, result);
// 触发事件
this.eventBus.emit('task:completed', { task, result });
}
async assignTask(taskId: string, assignee: string): Promise<void> {
const task = this.tasks.get(taskId);
if (!task) {
throw new Error(`Task with id ${taskId} not found`);
}
// 分配任务
task.assignee = assignee;
task.assignedTime = new Date();
// 触发事件
this.eventBus.emit('task:assigned', { task, assignee });
}
on(event: string, handler: Function): void {
this.eventBus.on(event, handler);
}
off(event: string, handler: Function): void {
this.eventBus.off(event, handler);
}
emit(event: string, data: any): void {
this.eventBus.emit(event, data);
}
private validateWorkflowDefinition(definition: WorkflowDefinition): void {
if (!definition.name) {
throw new Error('Workflow name is required');
}
if (!definition.nodes || definition.nodes.length === 0) {
throw new Error('Workflow must have at least one node');
}
// 验证开始节点
const startNodes = definition.nodes.filter(node => node.type === NodeType.START);
if (startNodes.length !== 1) {
throw new Error('Workflow must have exactly one start node');
}
// 验证结束节点
const endNodes = definition.nodes.filter(node => node.type === NodeType.END);
if (endNodes.length === 0) {
throw new Error('Workflow must have at least one end node');
}
// 验证节点连接
this.validateNodeConnections(definition.nodes, definition.edges);
}
private validateNodeConnections(nodes: WorkflowNode[], edges: WorkflowEdge[]): void {
const nodeIds = new Set(nodes.map(node => node.id));
for (const edge of edges) {
if (!nodeIds.has(edge.sourceId)) {
throw new Error(`Source node ${edge.sourceId} not found`);
}
if (!nodeIds.has(edge.targetId)) {
throw new Error(`Target node ${edge.targetId} not found`);
}
}
}
private registerBuiltInNodes(): void {
// 注册开始节点
this.nodeRegistry.register(NodeType.START, StartNode);
// 注册结束节点
this.nodeRegistry.register(NodeType.END, EndNode);
// 注册用户任务节点
this.nodeRegistry.register(NodeType.USER_TASK, UserTaskNode);
// 注册服务任务节点
this.nodeRegistry.register(NodeType.SERVICE_TASK, ServiceTaskNode);
// 注册网关节点
this.nodeRegistry.register(NodeType.EXCLUSIVE_GATEWAY, ExclusiveGatewayNode);
this.nodeRegistry.register(NodeType.PARALLEL_GATEWAY, ParallelGatewayNode);
// 注册脚本任务节点
this.nodeRegistry.register(NodeType.SCRIPT_TASK, ScriptTaskNode);
}
private bindEvents(): void {
// 监听节点执行完成
this.eventBus.on('node:completed', async (event) => {
await this.handleNodeCompleted(event.instance, event.node, event.result);
});
// 监听节点执行失败
this.eventBus.on('node:failed', async (event) => {
await this.handleNodeFailed(event.instance, event.node, event.error);
});
}
private async handleNodeCompleted(instance: WorkflowInstance, node: WorkflowNode, result: any): Promise<void> {
// 更新实例状态
instance.completedNodes.push(node.id);
instance.currentNodes = instance.currentNodes.filter(id => id !== node.id);
// 检查是否完成
if (node.type === NodeType.END) {
instance.status = WorkflowStatus.COMPLETED;
instance.endTime = new Date();
this.eventBus.emit('workflow:completed', { instance });
}
}
private async handleNodeFailed(instance: WorkflowInstance, node: WorkflowNode, error: Error): Promise<void> {
// 更新实例状态
instance.status = WorkflowStatus.FAILED;
instance.endTime = new Date();
instance.error = error.message;
// 触发事件
this.eventBus.emit('workflow:failed', { instance, error });
}
private generateId(): string {
return 'wf_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9);
}
}
// 工作流引擎配置
interface WorkflowEngineOptions {
nodeRegistry?: NodeRegistry;
eventBus?: EventBus;
persistence?: WorkflowPersistence;
scheduler?: WorkflowScheduler;
}
8.2 核心数据结构
8.2.1 工作流定义
// 工作流定义
interface WorkflowDefinition {
id?: string;
name: string;
description?: string;
version?: string;
nodes: WorkflowNode[];
edges: WorkflowEdge[];
variables?: Record<string, any>;
settings?: WorkflowSettings;
}
// 工作流设置
interface WorkflowSettings {
timeout?: number;
retryPolicy?: RetryPolicy;
errorHandling?: ErrorHandlingPolicy;
notifications?: NotificationSettings;
}
// 重试策略
interface RetryPolicy {
maxRetries: number;
retryDelay: number;
backoffMultiplier?: number;
}
// 错误处理策略
interface ErrorHandlingPolicy {
onError: 'terminate' | 'continue' | 'retry';
errorHandler?: string; // 错误处理器节点ID
}
// 通知设置
interface NotificationSettings {
onStart?: NotificationConfig[];
onComplete?: NotificationConfig[];
onError?: NotificationConfig[];
}
// 通知配置
interface NotificationConfig {
type: 'email' | 'sms' | 'webhook';
recipients: string[];
template?: string;
}
// 工作流类
class Workflow {
public readonly id: string;
public name: string;
public description?: string;
public version: string;
public nodes: WorkflowNode[];
public edges: WorkflowEdge[];
public variables: Record<string, any>;
public settings: WorkflowSettings;
public createdTime: Date;
public updatedTime: Date;
constructor(options: WorkflowOptions) {
this.id = options.id;
this.name = options.name;
this.description = options.description;
this.version = options.version;
this.nodes = options.nodes;
this.edges = options.edges;
this.variables = options.variables;
this.settings = options.settings;
this.createdTime = new Date();
this.updatedTime = new Date();
}
update(definition: WorkflowDefinition): void {
this.name = definition.name;
this.description = definition.description;
this.version = definition.version || this.version;
this.nodes = definition.nodes;
this.edges = definition.edges;
this.variables = definition.variables || {};
this.settings = definition.settings || {};
this.updatedTime = new Date();
}
getNode(nodeId: string): WorkflowNode | undefined {
return this.nodes.find(node => node.id === nodeId);
}
getOutgoingEdges(nodeId: string): WorkflowEdge[] {
return this.edges.filter(edge => edge.sourceId === nodeId);
}
getIncomingEdges(nodeId: string): WorkflowEdge[] {
return this.edges.filter(edge => edge.targetId === nodeId);
}
getStartNode(): WorkflowNode | undefined {
return this.nodes.find(node => node.type === NodeType.START);
}
getEndNodes(): WorkflowNode[] {
return this.nodes.filter(node => node.type === NodeType.END);
}
validate(): ValidationResult {
const errors: string[] = [];
// 验证开始节点
const startNodes = this.nodes.filter(node => node.type === NodeType.START);
if (startNodes.length !== 1) {
errors.push('Workflow must have exactly one start node');
}
// 验证结束节点
const endNodes = this.nodes.filter(node => node.type === NodeType.END);
if (endNodes.length === 0) {
errors.push('Workflow must have at least one end node');
}
// 验证节点连接
for (const node of this.nodes) {
if (node.type !== NodeType.END) {
const outgoingEdges = this.getOutgoingEdges(node.id);
if (outgoingEdges.length === 0) {
errors.push(`Node ${node.id} has no outgoing connections`);
}
}
}
return {
valid: errors.length === 0,
errors
};
}
}
// 工作流选项
interface WorkflowOptions {
id: string;
name: string;
description?: string;
version: string;
nodes: WorkflowNode[];
edges: WorkflowEdge[];
variables: Record<string, any>;
settings: WorkflowSettings;
}
// 验证结果
interface ValidationResult {
valid: boolean;
errors: string[];
}
8.2.2 工作流节点
// 节点类型
enum NodeType {
START = 'start',
END = 'end',
USER_TASK = 'userTask',
SERVICE_TASK = 'serviceTask',
SCRIPT_TASK = 'scriptTask',
EXCLUSIVE_GATEWAY = 'exclusiveGateway',
PARALLEL_GATEWAY = 'parallelGateway',
TIMER = 'timer',
MESSAGE = 'message',
SIGNAL = 'signal'
}
// 工作流节点
interface WorkflowNode {
id: string;
type: NodeType;
name: string;
description?: string;
properties: Record<string, any>;
position: NodePosition;
style?: NodeStyle;
}
// 节点位置
interface NodePosition {
x: number;
y: number;
}
// 节点样式
interface NodeStyle {
width?: number;
height?: number;
backgroundColor?: string;
borderColor?: string;
borderWidth?: number;
borderRadius?: number;
fontSize?: number;
fontColor?: string;
}
// 工作流边
interface WorkflowEdge {
id: string;
sourceId: string;
targetId: string;
condition?: string;
label?: string;
style?: EdgeStyle;
}
// 边样式
interface EdgeStyle {
strokeColor?: string;
strokeWidth?: number;
strokeDasharray?: string;
markerEnd?: string;
}
// 抽象节点基类
abstract class BaseNode {
protected id: string;
protected type: NodeType;
protected name: string;
protected properties: Record<string, any>;
constructor(node: WorkflowNode) {
this.id = node.id;
this.type = node.type;
this.name = node.name;
this.properties = node.properties;
}
abstract async execute(context: ExecutionContext): Promise<ExecutionResult>;
protected getProperty(key: string, defaultValue?: any): any {
return this.properties[key] ?? defaultValue;
}
protected setProperty(key: string, value: any): void {
this.properties[key] = value;
}
}
// 开始节点
class StartNode extends BaseNode {
async execute(context: ExecutionContext): Promise<ExecutionResult> {
return {
success: true,
data: context.variables,
nextNodes: this.getOutgoingNodes(context)
};
}
private getOutgoingNodes(context: ExecutionContext): string[] {
const workflow = context.workflow;
const outgoingEdges = workflow.getOutgoingEdges(this.id);
return outgoingEdges.map(edge => edge.targetId);
}
}
// 脚本任务节点
class ScriptTaskNode extends BaseNode {
async execute(context: ExecutionContext): Promise<ExecutionResult> {
try {
const scriptType = this.getProperty('scriptType', 'javascript');
const script = this.getProperty('script');
const resultVariable = this.getProperty('resultVariable');
if (!script) {
throw new Error('Script is required for script task');
}
// 执行脚本
const result = await this.executeScript(scriptType, script, context.variables);
// 更新变量
if (resultVariable) {
context.variables[resultVariable] = result;
}
return {
success: true,
data: result,
nextNodes: this.getOutgoingNodes(context)
};
} catch (error) {
return {
success: false,
error: error.message
};
}
}
private async executeScript(scriptType: string, script: string, variables: Record<string, any>): Promise<any> {
switch (scriptType) {
case 'javascript':
return this.executeJavaScript(script, variables);
case 'python':
return this.executePython(script, variables);
default:
throw new Error(`Unsupported script type: ${scriptType}`);
}
}
private async executeJavaScript(script: string, variables: Record<string, any>): Promise<any> {
// 创建安全的执行环境
const context = {
variables,
console: {
log: (...args: any[]) => console.log('[Script]', ...args)
},
Math,
Date,
JSON
};
// 包装脚本
const wrappedScript = `
(function() {
${script}
})()
`;
try {
// 使用 Function 构造器执行脚本
const func = new Function(...Object.keys(context), `return ${wrappedScript}`);
return func(...Object.values(context));
} catch (error) {
throw new Error(`Script execution failed: ${error.message}`);
}
}
private async executePython(script: string, variables: Record<string, any>): Promise<any> {
// 这里需要集成 Python 执行环境
// 可以使用 Pyodide 或者服务端 Python 执行器
throw new Error('Python script execution not implemented');
}
private getOutgoingNodes(context: ExecutionContext): string[] {
const workflow = context.workflow;
const outgoingEdges = workflow.getOutgoingEdges(this.id);
return outgoingEdges.map(edge => edge.targetId);
}
}
// 排他网关节点
class ExclusiveGatewayNode extends BaseNode {
async execute(context: ExecutionContext): Promise<ExecutionResult> {
try {
const nextNodes = await this.evaluateConditions(context);
if (nextNodes.length === 0) {
// 没有满足条件的路径,使用默认路径
const defaultPath = this.getDefaultPath(context);
if (defaultPath) {
nextNodes.push(defaultPath);
} else {
throw new Error('No condition matched and no default path defined');
}
}
return {
success: true,
data: context.variables,
nextNodes
};
} catch (error) {
return {
success: false,
error: error.message
};
}
}
private async evaluateConditions(context: ExecutionContext): Promise<string[]> {
const workflow = context.workflow;
const outgoingEdges = workflow.getOutgoingEdges(this.id);
for (const edge of outgoingEdges) {
if (edge.condition) {
const conditionResult = await this.evaluateCondition(edge.condition, context.variables);
if (conditionResult) {
return [edge.targetId];
}
}
}
return [];
}
private async evaluateCondition(condition: string, variables: Record<string, any>): Promise<boolean> {
try {
// 创建安全的条件评估环境
const context = { variables };
// 包装条件表达式
const wrappedCondition = `
(function() {
with (variables) {
return ${condition};
}
})()
`;
const func = new Function('variables', `return ${wrappedCondition}`);
return Boolean(func(variables));
} catch (error) {
console.warn(`Condition evaluation failed: ${error.message}`);
return false;
}
}
private getDefaultPath(context: ExecutionContext): string | null {
const workflow = context.workflow;
const outgoingEdges = workflow.getOutgoingEdges(this.id);
// 查找没有条件的边作为默认路径
const defaultEdge = outgoingEdges.find(edge => !edge.condition);
return defaultEdge ? defaultEdge.targetId : null;
}
}
// 并行网关节点
class ParallelGatewayNode extends BaseNode {
async execute(context: ExecutionContext): Promise<ExecutionResult> {
const gatewayType = this.getProperty('gatewayType', 'fork'); // fork 或 join
if (gatewayType === 'fork') {
return this.executeFork(context);
} else {
return this.executeJoin(context);
}
}
private async executeFork(context: ExecutionContext): Promise<ExecutionResult> {
// 并行分支:激活所有出口路径
const nextNodes = this.getAllOutgoingNodes(context);
return {
success: true,
data: context.variables,
nextNodes
};
}
private async executeJoin(context: ExecutionContext): Promise<ExecutionResult> {
// 并行汇聚:等待所有入口路径完成
const incomingNodes = this.getAllIncomingNodes(context);
const completedNodes = context.instance.completedNodes;
// 检查是否所有入口节点都已完成
const allCompleted = incomingNodes.every(nodeId => completedNodes.includes(nodeId));
if (allCompleted) {
const nextNodes = this.getAllOutgoingNodes(context);
return {
success: true,
data: context.variables,
nextNodes
};
} else {
// 等待其他分支完成
return {
success: true,
data: context.variables,
waiting: true
};
}
}
private getAllOutgoingNodes(context: ExecutionContext): string[] {
const workflow = context.workflow;
const outgoingEdges = workflow.getOutgoingEdges(this.id);
return outgoingEdges.map(edge => edge.targetId);
}
private getAllIncomingNodes(context: ExecutionContext): string[] {
const workflow = context.workflow;
const incomingEdges = workflow.getIncomingEdges(this.id);
return incomingEdges.map(edge => edge.sourceId);
}
}
// 执行上下文
interface ExecutionContext {
workflow: Workflow;
instance: WorkflowInstance;
variables: Record<string, any>;
engine: WorkflowEngine;
}
// 执行结果
interface ExecutionResult {
success: boolean;
data?: any;
error?: string;
nextNodes?: string[];
waiting?: boolean;
completed?: boolean;
taskId?: string;
}
8.3 工作流实例管理
8.3.1 工作流实例
// 工作流状态
enum WorkflowStatus {
CREATED = 'created',
RUNNING = 'running',
PAUSED = 'paused',
COMPLETED = 'completed',
FAILED = 'failed',
TERMINATED = 'terminated'
}
// 工作流实例
class WorkflowInstance {
public readonly id: string;
public readonly workflowId: string;
public status: WorkflowStatus;
public context: WorkflowContext;
public variables: Record<string, any>;
public currentNodes: string[];
public completedNodes: string[];
public startTime: Date;
public endTime?: Date;
public pauseTime?: Date;
public resumeTime?: Date;
public error?: string;
public metadata: Record<string, any>;
constructor(options: WorkflowInstanceOptions) {
this.id = options.id;
this.workflowId = options.workflowId;
this.status = options.status;
this.context = options.context;
this.variables = options.variables;
this.currentNodes = options.currentNodes;
this.completedNodes = options.completedNodes;
this.startTime = options.startTime;
this.endTime = options.endTime;
this.pauseTime = options.pauseTime;
this.resumeTime = options.resumeTime;
this.error = options.error;
this.metadata = options.metadata || {};
}
isRunning(): boolean {
return this.status === WorkflowStatus.RUNNING;
}
isPaused(): boolean {
return this.status === WorkflowStatus.PAUSED;
}
isCompleted(): boolean {
return this.status === WorkflowStatus.COMPLETED;
}
isFailed(): boolean {
return this.status === WorkflowStatus.FAILED;
}
isTerminated(): boolean {
return this.status === WorkflowStatus.TERMINATED;
}
getDuration(): number {
if (!this.endTime) {
return Date.now() - this.startTime.getTime();
}
return this.endTime.getTime() - this.startTime.getTime();
}
getVariable(name: string): any {
return this.variables[name];
}
setVariable(name: string, value: any): void {
this.variables[name] = value;
}
addCurrentNode(nodeId: string): void {
if (!this.currentNodes.includes(nodeId)) {
this.currentNodes.push(nodeId);
}
}
removeCurrentNode(nodeId: string): void {
this.currentNodes = this.currentNodes.filter(id => id !== nodeId);
}
addCompletedNode(nodeId: string): void {
if (!this.completedNodes.includes(nodeId)) {
this.completedNodes.push(nodeId);
}
}
toJSON(): any {
return {
id: this.id,
workflowId: this.workflowId,
status: this.status,
context: this.context,
variables: this.variables,
currentNodes: this.currentNodes,
completedNodes: this.completedNodes,
startTime: this.startTime,
endTime: this.endTime,
pauseTime: this.pauseTime,
resumeTime: this.resumeTime,
error: this.error,
metadata: this.metadata
};
}
}
// 工作流实例选项
interface WorkflowInstanceOptions {
id: string;
workflowId: string;
status: WorkflowStatus;
context: WorkflowContext;
variables: Record<string, any>;
currentNodes: string[];
completedNodes: string[];
startTime: Date;
endTime?: Date;
pauseTime?: Date;
resumeTime?: Date;
error?: string;
metadata?: Record<string, any>;
}
// 工作流上下文
interface WorkflowContext {
initiator: string;
businessKey?: string;
variables: Record<string, any>;
metadata?: Record<string, any>;
}
8.3.2 任务管理
// 任务类型
enum TaskType {
USER_TASK = 'userTask',
SERVICE_TASK = 'serviceTask',
SCRIPT_TASK = 'scriptTask',
MANUAL_TASK = 'manualTask'
}
// 任务状态
enum TaskStatus {
CREATED = 'created',
ASSIGNED = 'assigned',
IN_PROGRESS = 'inProgress',
COMPLETED = 'completed',
CANCELLED = 'cancelled',
FAILED = 'failed'
}
// 任务优先级
enum TaskPriority {
LOW = 'low',
NORMAL = 'normal',
HIGH = 'high',
URGENT = 'urgent'
}
// 任务接口
interface Task {
id: string;
instanceId: string;
nodeId: string;
type: TaskType;
name: string;
description?: string;
assignee?: string;
candidateUsers: string[];
candidateGroups: string[];
dueDate?: Date;
priority: TaskPriority;
formKey?: string;
variables: Record<string, any>;
status: TaskStatus;
result?: TaskResult;
createdTime: Date;
assignedTime?: Date;
startTime?: Date;
completedTime?: Date;
metadata?: Record<string, any>;
}
// 任务结果
interface TaskResult {
success: boolean;
data?: any;
error?: string;
variables?: Record<string, any>;
}
// 任务管理器
class TaskManager {
private tasks: Map<string, Task> = new Map();
private userTasks: Map<string, string[]> = new Map(); // userId -> taskIds
private groupTasks: Map<string, string[]> = new Map(); // groupId -> taskIds
private eventBus: EventBus;
constructor(eventBus: EventBus) {
this.eventBus = eventBus;
}
createTask(task: Task): void {
this.tasks.set(task.id, task);
// 更新用户任务索引
if (task.assignee) {
this.addUserTask(task.assignee, task.id);
}
// 更新候选用户任务索引
for (const userId of task.candidateUsers) {
this.addUserTask(userId, task.id);
}
// 更新候选组任务索引
for (const groupId of task.candidateGroups) {
this.addGroupTask(groupId, task.id);
}
// 触发事件
this.eventBus.emit('task:created', { task });
}
getTask(taskId: string): Task | undefined {
return this.tasks.get(taskId);
}
getUserTasks(userId: string): Task[] {
const taskIds = this.userTasks.get(userId) || [];
return taskIds.map(id => this.tasks.get(id)).filter(task => task !== undefined) as Task[];
}
getGroupTasks(groupId: string): Task[] {
const taskIds = this.groupTasks.get(groupId) || [];
return taskIds.map(id => this.tasks.get(id)).filter(task => task !== undefined) as Task[];
}
assignTask(taskId: string, assignee: string): void {
const task = this.tasks.get(taskId);
if (!task) {
throw new Error(`Task with id ${taskId} not found`);
}
if (task.status !== TaskStatus.CREATED) {
throw new Error('Can only assign created tasks');
}
// 更新任务
task.assignee = assignee;
task.status = TaskStatus.ASSIGNED;
task.assignedTime = new Date();
// 更新索引
this.addUserTask(assignee, taskId);
// 触发事件
this.eventBus.emit('task:assigned', { task, assignee });
}
startTask(taskId: string, userId: string): void {
const task = this.tasks.get(taskId);
if (!task) {
throw new Error(`Task with id ${taskId} not found`);
}
if (task.status !== TaskStatus.ASSIGNED && task.status !== TaskStatus.CREATED) {
throw new Error('Can only start assigned or created tasks');
}
// 检查权限
if (!this.canUserAccessTask(userId, task)) {
throw new Error('User does not have permission to start this task');
}
// 更新任务
task.status = TaskStatus.IN_PROGRESS;
task.startTime = new Date();
// 触发事件
this.eventBus.emit('task:started', { task, userId });
}
completeTask(taskId: string, userId: string, result: TaskResult): void {
const task = this.tasks.get(taskId);
if (!task) {
throw new Error(`Task with id ${taskId} not found`);
}
if (task.status !== TaskStatus.IN_PROGRESS) {
throw new Error('Can only complete in-progress tasks');
}
// 检查权限
if (!this.canUserAccessTask(userId, task)) {
throw new Error('User does not have permission to complete this task');
}
// 更新任务
task.status = TaskStatus.COMPLETED;
task.result = result;
task.completedTime = new Date();
// 触发事件
this.eventBus.emit('task:completed', { task, userId, result });
}
cancelTask(taskId: string, reason?: string): void {
const task = this.tasks.get(taskId);
if (!task) {
throw new Error(`Task with id ${taskId} not found`);
}
if (task.status === TaskStatus.COMPLETED || task.status === TaskStatus.CANCELLED) {
throw new Error('Cannot cancel completed or already cancelled tasks');
}
// 更新任务
task.status = TaskStatus.CANCELLED;
if (reason) {
task.metadata = { ...task.metadata, cancelReason: reason };
}
// 触发事件
this.eventBus.emit('task:cancelled', { task, reason });
}
queryTasks(query: TaskQuery): Task[] {
let tasks = Array.from(this.tasks.values());
// 按状态过滤
if (query.status) {
tasks = tasks.filter(task => task.status === query.status);
}
// 按分配人过滤
if (query.assignee) {
tasks = tasks.filter(task => task.assignee === query.assignee);
}
// 按候选用户过滤
if (query.candidateUser) {
tasks = tasks.filter(task => task.candidateUsers.includes(query.candidateUser!));
}
// 按候选组过滤
if (query.candidateGroup) {
tasks = tasks.filter(task => task.candidateGroups.includes(query.candidateGroup!));
}
// 按工作流实例过滤
if (query.instanceId) {
tasks = tasks.filter(task => task.instanceId === query.instanceId);
}
// 按到期时间过滤
if (query.dueBefore) {
tasks = tasks.filter(task => task.dueDate && task.dueDate <= query.dueBefore!);
}
if (query.dueAfter) {
tasks = tasks.filter(task => task.dueDate && task.dueDate >= query.dueAfter!);
}
// 排序
if (query.orderBy) {
tasks.sort((a, b) => {
const aValue = this.getTaskProperty(a, query.orderBy!);
const bValue = this.getTaskProperty(b, query.orderBy!);
if (query.orderDirection === 'desc') {
return bValue > aValue ? 1 : -1;
} else {
return aValue > bValue ? 1 : -1;
}
});
}
// 分页
if (query.offset !== undefined && query.limit !== undefined) {
tasks = tasks.slice(query.offset, query.offset + query.limit);
}
return tasks;
}
private canUserAccessTask(userId: string, task: Task): boolean {
// 检查是否是分配人
if (task.assignee === userId) {
return true;
}
// 检查是否是候选用户
if (task.candidateUsers.includes(userId)) {
return true;
}
// 这里可以添加更复杂的权限检查逻辑
// 例如检查用户是否属于候选组
return false;
}
private addUserTask(userId: string, taskId: string): void {
if (!this.userTasks.has(userId)) {
this.userTasks.set(userId, []);
}
const taskIds = this.userTasks.get(userId)!;
if (!taskIds.includes(taskId)) {
taskIds.push(taskId);
}
}
private addGroupTask(groupId: string, taskId: string): void {
if (!this.groupTasks.has(groupId)) {
this.groupTasks.set(groupId, []);
}
const taskIds = this.groupTasks.get(groupId)!;
if (!taskIds.includes(taskId)) {
taskIds.push(taskId);
}
}
private getTaskProperty(task: Task, property: string): any {
switch (property) {
case 'createdTime':
return task.createdTime;
case 'dueDate':
return task.dueDate;
case 'priority':
return this.getPriorityValue(task.priority);
default:
return (task as any)[property];
}
}
private getPriorityValue(priority: TaskPriority): number {
switch (priority) {
case TaskPriority.LOW:
return 1;
case TaskPriority.NORMAL:
return 2;
case TaskPriority.HIGH:
return 3;
case TaskPriority.URGENT:
return 4;
default:
return 0;
}
}
}
// 任务查询
interface TaskQuery {
status?: TaskStatus;
assignee?: string;
candidateUser?: string;
candidateGroup?: string;
instanceId?: string;
dueBefore?: Date;
dueAfter?: Date;
orderBy?: string;
orderDirection?: 'asc' | 'desc';
offset?: number;
limit?: number;
}
8.4 执行引擎
8.4.1 执行引擎实现
// 执行引擎
class ExecutionEngine {
private runningInstances: Map<string, WorkflowInstance> = new Map();
private nodeRegistry: NodeRegistry;
private eventBus: EventBus;
private taskManager: TaskManager;
constructor(eventBus: EventBus) {
this.eventBus = eventBus;
this.nodeRegistry = new DefaultNodeRegistry();
this.taskManager = new TaskManager(eventBus);
this.bindEvents();
}
async startExecution(workflow: Workflow, instance: WorkflowInstance): Promise<void> {
// 添加到运行实例
this.runningInstances.set(instance.id, instance);
// 获取开始节点
const startNode = workflow.getStartNode();
if (!startNode) {
throw new Error('Workflow has no start node');
}
// 执行开始节点
await this.executeNode(workflow, instance, startNode);
}
async pauseExecution(instanceId: string): Promise<void> {
const instance = this.runningInstances.get(instanceId);
if (!instance) {
throw new Error(`Instance ${instanceId} not found`);
}
// 暂停实例(实际的暂停逻辑由调用方处理)
// 这里主要是清理运行状态
this.runningInstances.delete(instanceId);
}
async resumeExecution(instanceId: string): Promise<void> {
// 恢复执行逻辑
// 需要重新加载实例状态并继续执行
console.log(`Resuming execution for instance ${instanceId}`);
}
async terminateExecution(instanceId: string): Promise<void> {
const instance = this.runningInstances.get(instanceId);
if (!instance) {
return; // 实例不在运行中
}
// 清理运行状态
this.runningInstances.delete(instanceId);
// 取消相关任务
const tasks = this.taskManager.queryTasks({ instanceId });
for (const task of tasks) {
if (task.status !== TaskStatus.COMPLETED && task.status !== TaskStatus.CANCELLED) {
this.taskManager.cancelTask(task.id, 'Workflow terminated');
}
}
}
async continueExecution(instanceId: string, nodeId: string, result: any): Promise<void> {
const instance = this.runningInstances.get(instanceId);
if (!instance) {
throw new Error(`Instance ${instanceId} not found`);
}
// 获取工作流定义
const workflow = await this.getWorkflow(instance.workflowId);
if (!workflow) {
throw new Error(`Workflow ${instance.workflowId} not found`);
}
// 更新实例状态
instance.removeCurrentNode(nodeId);
instance.addCompletedNode(nodeId);
// 获取下一个节点
const nextNodes = this.getNextNodes(workflow, nodeId, result);
// 执行下一个节点
for (const nextNodeId of nextNodes) {
const nextNode = workflow.getNode(nextNodeId);
if (nextNode) {
await this.executeNode(workflow, instance, nextNode);
}
}
}
private async executeNode(workflow: Workflow, instance: WorkflowInstance, node: WorkflowNode): Promise<void> {
try {
// 添加到当前节点
instance.addCurrentNode(node.id);
// 创建执行上下文
const context: ExecutionContext = {
workflow,
instance,
variables: instance.variables,
engine: this as any // 类型转换,实际使用时需要正确的接口
};
// 获取节点执行器
const nodeExecutor = this.nodeRegistry.getNode(node.type);
if (!nodeExecutor) {
throw new Error(`No executor found for node type: ${node.type}`);
}
// 创建节点实例
const nodeInstance = new nodeExecutor(node);
// 执行节点
const result = await nodeInstance.execute(context);
// 处理执行结果
await this.handleExecutionResult(workflow, instance, node, result);
} catch (error) {
// 处理执行错误
await this.handleExecutionError(workflow, instance, node, error as Error);
}
}
private async handleExecutionResult(
workflow: Workflow,
instance: WorkflowInstance,
node: WorkflowNode,
result: ExecutionResult
): Promise<void> {
if (result.success) {
// 触发节点完成事件
this.eventBus.emit('node:completed', { workflow, instance, node, result });
if (result.completed) {
// 工作流完成
instance.status = WorkflowStatus.COMPLETED;
instance.endTime = new Date();
this.runningInstances.delete(instance.id);
this.eventBus.emit('workflow:completed', { workflow, instance });
} else if (result.waiting) {
// 等待外部事件(如用户任务)
if (result.taskId) {
// 任务已创建,等待完成
}
} else if (result.nextNodes) {
// 继续执行下一个节点
for (const nextNodeId of result.nextNodes) {
const nextNode = workflow.getNode(nextNodeId);
if (nextNode) {
await this.executeNode(workflow, instance, nextNode);
}
}
}
} else {
// 执行失败
throw new Error(result.error || 'Node execution failed');
}
}
private async handleExecutionError(
workflow: Workflow,
instance: WorkflowInstance,
node: WorkflowNode,
error: Error
): Promise<void> {
// 更新实例状态
instance.status = WorkflowStatus.FAILED;
instance.endTime = new Date();
instance.error = error.message;
// 清理运行状态
this.runningInstances.delete(instance.id);
// 触发错误事件
this.eventBus.emit('node:failed', { workflow, instance, node, error });
this.eventBus.emit('workflow:failed', { workflow, instance, error });
}
private getNextNodes(workflow: Workflow, nodeId: string, result: any): string[] {
const outgoingEdges = workflow.getOutgoingEdges(nodeId);
// 如果结果中指定了下一个节点,使用结果中的
if (result && result.nextNodes) {
return result.nextNodes;
}
// 否则返回所有出口节点
return outgoingEdges.map(edge => edge.targetId);
}
private async getWorkflow(workflowId: string): Promise<Workflow | null> {
// 这里应该从工作流存储中获取工作流定义
// 简化实现,实际应该注入工作流存储服务
return null;
}
private bindEvents(): void {
// 监听任务完成事件
this.eventBus.on('task:completed', async (event) => {
const { task, result } = event;
await this.continueExecution(task.instanceId, task.nodeId, result);
});
}
addTask(task: Task): void {
this.taskManager.createTask(task);
}
getTaskManager(): TaskManager {
return this.taskManager;
}
}
8.4.2 节点注册器
// 节点注册器接口
interface NodeRegistry {
register(nodeType: NodeType, nodeClass: new (node: WorkflowNode) => BaseNode): void;
getNode(nodeType: NodeType): (new (node: WorkflowNode) => BaseNode) | undefined;
unregister(nodeType: NodeType): void;
getRegisteredTypes(): NodeType[];
}
// 默认节点注册器
class DefaultNodeRegistry implements NodeRegistry {
private nodes: Map<NodeType, new (node: WorkflowNode) => BaseNode> = new Map();
register(nodeType: NodeType, nodeClass: new (node: WorkflowNode) => BaseNode): void {
this.nodes.set(nodeType, nodeClass);
}
getNode(nodeType: NodeType): (new (node: WorkflowNode) => BaseNode) | undefined {
return this.nodes.get(nodeType);
}
unregister(nodeType: NodeType): void {
this.nodes.delete(nodeType);
}
getRegisteredTypes(): NodeType[] {
return Array.from(this.nodes.keys());
}
}
8.5 工作流引擎使用示例
8.5.1 完整使用示例
// 工作流引擎使用示例
class LowCodeWorkflowEngineDemo {
private engine: LowCodeWorkflowEngine;
constructor() {
// 创建事件总线
const eventBus = new EventBus();
// 创建工作流引擎
this.engine = new LowCodeWorkflowEngine({
eventBus
});
this.setupEventListeners();
}
async demo(): Promise<void> {
console.log('=== 低代码工作流引擎演示 ===');
// 1. 创建请假审批工作流
await this.createLeaveApprovalWorkflow();
// 2. 启动工作流实例
await this.startLeaveApprovalProcess();
// 3. 处理用户任务
await this.handleUserTasks();
// 4. 查询工作流状态
await this.queryWorkflowStatus();
}
private async createLeaveApprovalWorkflow(): Promise<void> {
console.log('\n1. 创建请假审批工作流');
const workflowDefinition: WorkflowDefinition = {
id: 'leave_approval',
name: '请假审批流程',
description: '员工请假审批工作流',
version: '1.0.0',
nodes: [
{
id: 'start',
type: NodeType.START,
name: '开始',
properties: {},
position: { x: 100, y: 100 }
},
{
id: 'submit_application',
type: NodeType.USER_TASK,
name: '提交请假申请',
properties: {
taskName: '填写请假申请',
description: '请填写请假申请表单',
formKey: 'leave_application_form',
candidateGroups: ['employees']
},
position: { x: 300, y: 100 }
},
{
id: 'manager_approval',
type: NodeType.USER_TASK,
name: '经理审批',
properties: {
taskName: '审批请假申请',
description: '请审批员工的请假申请',
assignee: '${applicant.manager}',
dueDate: 'P1D' // 1天内完成
},
position: { x: 500, y: 100 }
},
{
id: 'approval_gateway',
type: NodeType.EXCLUSIVE_GATEWAY,
name: '审批结果判断',
properties: {},
position: { x: 700, y: 100 }
},
{
id: 'hr_approval',
type: NodeType.USER_TASK,
name: 'HR审批',
properties: {
taskName: 'HR最终审批',
description: '请进行最终审批',
candidateGroups: ['hr']
},
position: { x: 700, y: 300 }
},
{
id: 'send_notification',
type: NodeType.SERVICE_TASK,
name: '发送通知',
properties: {
serviceClass: 'NotificationService',
method: 'sendLeaveApprovalNotification',
parameters: {
applicantId: '${applicant.id}',
approved: '${approved}',
reason: '${approvalReason}'
}
},
position: { x: 900, y: 200 }
},
{
id: 'end',
type: NodeType.END,
name: '结束',
properties: {},
position: { x: 1100, y: 200 }
}
],
edges: [
{
id: 'edge1',
sourceId: 'start',
targetId: 'submit_application'
},
{
id: 'edge2',
sourceId: 'submit_application',
targetId: 'manager_approval'
},
{
id: 'edge3',
sourceId: 'manager_approval',
targetId: 'approval_gateway'
},
{
id: 'edge4',
sourceId: 'approval_gateway',
targetId: 'hr_approval',
condition: 'leaveDays > 3 && approved === true',
label: '超过3天且经理同意'
},
{
id: 'edge5',
sourceId: 'approval_gateway',
targetId: 'send_notification',
condition: 'leaveDays <= 3 || approved === false',
label: '3天以内或经理拒绝'
},
{
id: 'edge6',
sourceId: 'hr_approval',
targetId: 'send_notification'
},
{
id: 'edge7',
sourceId: 'send_notification',
targetId: 'end'
}
],
variables: {
applicant: null,
leaveDays: 0,
leaveReason: '',
approved: false,
approvalReason: ''
},
settings: {
timeout: 7 * 24 * 60 * 60 * 1000, // 7天超时
retryPolicy: {
maxRetries: 3,
retryDelay: 1000
},
errorHandling: {
onError: 'terminate'
}
}
};
const workflow = await this.engine.createWorkflow(workflowDefinition);
console.log(`工作流创建成功: ${workflow.id}`);
}
private async startLeaveApprovalProcess(): Promise<void> {
console.log('\n2. 启动请假审批流程');
const context: WorkflowContext = {
initiator: 'user123',
businessKey: 'leave_2024_001',
variables: {
applicant: {
id: 'user123',
name: '张三',
manager: 'manager456'
},
leaveDays: 5,
leaveReason: '家庭事务',
leaveStartDate: '2024-01-15',
leaveEndDate: '2024-01-19'
}
};
const instance = await this.engine.startWorkflow('leave_approval', context);
console.log(`工作流实例启动成功: ${instance.id}`);
console.log(`当前状态: ${instance.status}`);
}
private async handleUserTasks(): Promise<void> {
console.log('\n3. 处理用户任务');
// 模拟处理任务
setTimeout(async () => {
// 获取任务管理器
const taskManager = (this.engine as any).executionEngine?.getTaskManager();
if (!taskManager) return;
// 查询待处理任务
const pendingTasks = taskManager.queryTasks({
status: TaskStatus.CREATED
});
console.log(`找到 ${pendingTasks.length} 个待处理任务`);
for (const task of pendingTasks) {
console.log(`处理任务: ${task.name} (${task.id})`);
// 模拟任务完成
const result: TaskResult = {
success: true,
data: {
approved: true,
approvalReason: '同意请假申请'
},
variables: {
approved: true,
approvalReason: '同意请假申请'
}
};
await this.engine.completeTask(task.id, result);
console.log(`任务完成: ${task.id}`);
}
}, 1000);
}
private async queryWorkflowStatus(): Promise<void> {
console.log('\n4. 查询工作流状态');
setTimeout(async () => {
// 这里应该查询实际的工作流实例
console.log('工作流执行状态查询功能演示');
}, 2000);
}
private setupEventListeners(): void {
// 监听工作流事件
this.engine.on('workflow:started', (event) => {
console.log(`[事件] 工作流启动: ${event.instance.id}`);
});
this.engine.on('workflow:completed', (event) => {
console.log(`[事件] 工作流完成: ${event.instance.id}`);
});
this.engine.on('workflow:failed', (event) => {
console.log(`[事件] 工作流失败: ${event.instance.id}, 错误: ${event.error.message}`);
});
this.engine.on('task:created', (event) => {
console.log(`[事件] 任务创建: ${event.task.name} (${event.task.id})`);
});
this.engine.on('task:completed', (event) => {
console.log(`[事件] 任务完成: ${event.task.name} (${event.task.id})`);
});
}
}
// 运行演示
const demo = new LowCodeWorkflowEngineDemo();
demo.demo().catch(console.error);
8.6 小结
本章详细介绍了低代码平台中工作流引擎的设计与实现,涵盖了以下核心内容:
8.6.1 核心要点
工作流引擎架构
- 采用事件驱动的架构设计
- 支持多种节点类型和执行模式
- 提供完整的生命周期管理
节点系统设计
- 抽象节点基类和具体节点实现
- 支持用户任务、服务任务、脚本任务等
- 网关节点实现条件分支和并行处理
实例管理
- 工作流实例的状态管理
- 支持暂停、恢复、终止操作
- 完整的执行上下文和变量管理
任务管理
- 用户任务的创建、分配、执行
- 支持候选用户和候选组
- 任务查询和状态跟踪
8.6.2 技术特色
- 类型安全: 使用 TypeScript 提供完整的类型定义
- 事件驱动: 基于事件总线的松耦合架构
- 可扩展性: 支持自定义节点类型和执行器
- 状态管理: 完整的工作流和任务状态跟踪
- 错误处理: 完善的异常处理和恢复机制
8.6.3 最佳实践
- 工作流设计: 保持流程简洁,避免过度复杂
- 节点实现: 确保节点执行的幂等性
- 错误处理: 实现适当的重试和回滚机制
- 性能优化: 合理使用并行网关提高执行效率
- 监控告警: 建立完善的流程监控体系
8.6.4 扩展方向
- 可视化设计器: 图形化的工作流设计界面
- 流程监控: 实时的流程执行监控和统计
- 版本管理: 工作流定义的版本控制
- 集成能力: 与外部系统的集成接口
- 规则引擎: 更强大的条件判断和规则处理
下一章我们将学习页面生成器与动态页面,了解如何基于配置动态生成完整的页面应用。}]},“query_language”:“Chinese”}}
// 结束节点
class EndNode extends BaseNode {
async execute(context: ExecutionContext): Promise
// 用户任务节点
class UserTaskNode extends BaseNode {
async execute(context: ExecutionContext): Promise
// 等待用户完成任务
return {
success: true,
data: context.variables,
waiting: true,
taskId: task.id
};
}
private async createUserTask(context: ExecutionContext): Promise
// 保存任务
context.engine.addTask(task);
return task;
}
private calculateDueDate(): Date | undefined { const dueDateExpression = this.getProperty(‘dueDate’); if (!dueDateExpression) { return undefined; }
// 解析到期时间表达式
// 例如: "P1D" (1天后), "PT2H" (2小时后)
const now = new Date();
if (dueDateExpression.startsWith('P')) {
// ISO 8601 duration format
return this.parseDuration(dueDateExpression, now);
}
// 直接日期格式
return new Date(dueDateExpression);
}
private parseDuration(duration: string, baseDate: Date): Date { // 简化的 ISO 8601 duration 解析 const result = new Date(baseDate);
const dayMatch = duration.match(/P(\d+)D/);
if (dayMatch) {
result.setDate(result.getDate() + parseInt(dayMatch[1]));
}
const hourMatch = duration.match(/PT(\d+)H/);
if (hourMatch) {
result.setHours(result.getHours() + parseInt(hourMatch[1]));
}
const minuteMatch = duration.match(/PT(\d+)M/);
if (minuteMatch) {
result.setMinutes(result.getMinutes() + parseInt(minuteMatch[1]));
}
return result;
}
private generateTaskId(): string { return ‘task’ + Date.now() + ‘’ + Math.random().toString(36).substr(2, 9); } }
// 服务任务节点
class ServiceTaskNode extends BaseNode {
async execute(context: ExecutionContext): Promise
// 解析参数中的变量
const resolvedParameters = this.resolveParameters(parameters, context.variables);
// 执行服务调用
const result = await this.invokeService(serviceClass, method, resolvedParameters);
// 更新变量
const resultVariable = this.getProperty('resultVariable');
if (resultVariable) {
context.variables[resultVariable] = result;
}
return {
success: true,
data: result,
nextNodes: this.getOutgoingNodes(context)
};
} catch (error) {
return {
success: false,
error: error.message
};
}
}
private resolveParameters(parameters: Record
for (const [key, value] of Object.entries(parameters)) {
if (typeof value === 'string' && value.startsWith('${') && value.endsWith('}')) {
// 变量引用
const variableName = value.slice(2, -1);
resolved[key] = variables[variableName];
} else {
resolved[key] = value;
}
}
return resolved;
}
private async invokeService(serviceClass: string, method: string, parameters: RecordInvoking service: ${serviceClass}.${method}
, parameters);
// 模拟异步服务调用
return new Promise((resolve) => {
setTimeout(() => {
resolve({ success: true, timestamp: new Date() });
}, 100);
});
}
private getOutgoingNodes(context: ExecutionContext): string[] { const workflow = context.workflow; const outgoingEdges = workflow.getOutgoingEdges(this.id); return outgoingEdges.map(edge => edge.targetId); } } “`