diff --git a/src/common/runnable/run-many-for.ts b/src/common/runnable/run-many-for.ts index d82ce8d2bd..106cc74da1 100644 --- a/src/common/runnable/run-many-for.ts +++ b/src/common/runnable/run-many-for.ts @@ -4,11 +4,12 @@ */ import type { DiContainerForInjection, InjectionToken } from "@ogre-tools/injectable"; import type { SingleOrMany } from "../utils"; -import { getOrInsertSetFor, isDefined } from "../utils"; -import { observable, when } from "mobx"; +import { getOrInsert, getOrInsertSetFor, isDefined } from "../utils"; import * as uuid from "uuid"; import assert from "assert"; import type { Asyncify } from "type-fest"; +import type TypedEventEmitter from "typed-emitter"; +import EventEmitter from "events"; export interface Runnable { id: string; @@ -80,18 +81,50 @@ const verifyRunnablesAreDAG = (injectionToken: InjectionToken void; +} + +class DynamicBarrier { + private readonly finishedIds = new Map>(); + private readonly events: TypedEventEmitter = new EventEmitter(); + + private initFinishingPromise(id: string): Promise { + return getOrInsert(this.finishedIds, id, new Promise(resolve => { + const handler = (finishedId: string) => { + if (finishedId === id) { + resolve(); + this.events.removeListener("finish", handler); + } + }; + + this.events.addListener("finish", handler); + })); + } + + setFinished(id: string): void { + void this.initFinishingPromise(id); + + this.events.emit("finish", id); + } + + async blockOn(id: string): Promise { + await this.initFinishingPromise(id); + } +} + const executeRunnableWith = (param: Param) => { - const finishedIds = observable.set(); + const barrier = new DynamicBarrier(); return async (runnable: Runnable): Promise => { const parentRunnables = [runnable.runAfter].flat().filter(isDefined); for (const parentRunnable of parentRunnables) { - await when(() => finishedIds.has(parentRunnable.id)); + await barrier.blockOn(parentRunnable.id); } await runnable.run(param); - finishedIds.add(runnable.id); + barrier.setFinished(runnable.id); }; };