From 5b80dfc70a0185eb65b71c6fa61cd9a726761ddf Mon Sep 17 00:00:00 2001 From: Sebastian Malton Date: Thu, 1 Dec 2022 09:17:18 -0500 Subject: [PATCH] Add support for multiple "runAfter" runnables - Needed so that several dependencies can be declared Signed-off-by: Sebastian Malton --- src/common/runnable/run-many-for.ts | 85 ++++++++++++++++++++--------- 1 file changed, 60 insertions(+), 25 deletions(-) diff --git a/src/common/runnable/run-many-for.ts b/src/common/runnable/run-many-for.ts index ce3123c0b0..6f63160b4c 100644 --- a/src/common/runnable/run-many-for.ts +++ b/src/common/runnable/run-many-for.ts @@ -3,46 +3,81 @@ * Licensed under MIT License. See LICENSE in root directory for more information. */ import type { DiContainerForInjection, InjectionToken } from "@ogre-tools/injectable"; -import type { Composite } from "../utils/composite/get-composite/get-composite"; -import { getCompositeFor } from "../utils/composite/get-composite/get-composite"; +import type { SingleOrMany } from "../utils"; +import { getOrInsertSet, isDefined } from "../utils"; +import { observable, when } from "mobx"; import * as uuid from "uuid"; +import assert from "assert"; export interface Runnable { id: string; run: Run; - runAfter?: Runnable; + runAfter?: SingleOrMany>; } type Run = (parameter: Param) => Promise | void; export type RunMany = (injectionToken: InjectionToken, void>) => Run; -async function runCompositeRunnables(param: Param, composite: Composite>) { - await composite.value.run(param); - await Promise.all(composite.children.map(composite => runCompositeRunnables(param, composite))); -} +const computedNextEdge = (traversed: string[], graph: Map>, currentId: string) => { + const currentNode = graph.get(currentId); + + assert(currentNode, `Runnable graph does not contain node with id="${currentId}"`); + + for (const nextId of currentNode.values()) { + if (traversed.includes(nextId)) { + throw new Error(`Cycle in runnable graph: "${traversed.join(`" -> "`)}" -> "${nextId}"`); + } + + computedNextEdge([...traversed, nextId], graph, nextId); + } +}; + +const verifyRunnablesAreDAG = (runnables: Runnable[]) => { + const rootId = uuid.v4(); + const runnableGraph = new Map>(); + + // Build the Directed graph + for (const runnable of runnables) { + getOrInsertSet(runnableGraph, runnable.id); + + if (!runnable.runAfter) { + getOrInsertSet(runnableGraph, rootId).add(runnable.id); + } else if (Array.isArray(runnable.runAfter)) { + for (const parentRunnable of runnable.runAfter) { + getOrInsertSet(runnableGraph, parentRunnable.id).add(runnable.id); + } + } else { + getOrInsertSet(runnableGraph, runnable.runAfter.id).add(runnable.id); + } + } + + // Do a DFS to find any cycles + computedNextEdge([], runnableGraph, rootId); +}; + +const executeRunnableWith = (param: Param) => { + const finishedIds = observable.set(); + + return async (runnable: Runnable): Promise => { + const parentRunnables = [runnable.runAfter].flat().filter(isDefined); + + for (const parentRunnable of parentRunnables) { + await when(() => finishedIds.has(parentRunnable.id)); + } + + await runnable.run(param); + finishedIds.add(runnable.id); + }; +}; export function runManyFor(di: DiContainerForInjection): RunMany { return (injectionToken: InjectionToken, void>) => async (param: Param) => { + const executeRunnable = executeRunnableWith(param); const allRunnables = di.injectMany(injectionToken); - const rootId = uuid.v4(); - const getCompositeRunnables = getCompositeFor>({ - getId: (runnable) => runnable.id, - getParentId: (runnable) => ( - runnable.id === rootId - ? undefined - : runnable.runAfter?.id ?? rootId - ), - }); - const composite = getCompositeRunnables([ - // This is a dummy runnable to conform to the requirements of `getCompositeFor` to only have one root - { - id: rootId, - run: () => {}, - }, - ...allRunnables, - ]); - await runCompositeRunnables(param, composite); + verifyRunnablesAreDAG(allRunnables); + + await Promise.all(allRunnables.map(executeRunnable)); }; }