From b1fac0d727ad84613aab525d7d605dc21837f627 Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 26 Jan 2021 15:22:11 +0200 Subject: [PATCH] use `plimit` + delay for k8s watch requests Signed-off-by: Roman --- src/common/utils/delay.ts | 6 +++++ src/common/utils/index.ts | 1 + src/main/routes/watch-route.ts | 49 ++++++++++++---------------------- 3 files changed, 24 insertions(+), 32 deletions(-) create mode 100644 src/common/utils/delay.ts diff --git a/src/common/utils/delay.ts b/src/common/utils/delay.ts new file mode 100644 index 0000000000..208e042759 --- /dev/null +++ b/src/common/utils/delay.ts @@ -0,0 +1,6 @@ +// Create async delay for provided timeout in milliseconds + +export async function delay(timeoutMs = 1000) { + if (!timeoutMs) return; + await new Promise(resolve => setTimeout(resolve, timeoutMs)); +} diff --git a/src/common/utils/index.ts b/src/common/utils/index.ts index 582135d7f0..942c675f0a 100644 --- a/src/common/utils/index.ts +++ b/src/common/utils/index.ts @@ -7,6 +7,7 @@ export * from "./autobind"; export * from "./base64"; export * from "./camelCase"; export * from "./cloneJson"; +export * from "./delay"; export * from "./debouncePromise"; export * from "./defineGlobal"; export * from "./getRandId"; diff --git a/src/main/routes/watch-route.ts b/src/main/routes/watch-route.ts index 7141337241..658b1588b9 100644 --- a/src/main/routes/watch-route.ts +++ b/src/main/routes/watch-route.ts @@ -1,11 +1,12 @@ import type { KubeJsonApiData, KubeJsonApiError } from "../../renderer/api/kube-json-api"; +import plimit from "p-limit"; +import { delay } from "../../common/utils"; import { LensApiRequest } from "../router"; import { LensApi } from "../lens-api"; import { KubeConfig, Watch } from "@kubernetes/client-node"; import { ServerResponse } from "http"; import { Request } from "request"; -import { chunk } from "lodash"; import logger from "../logger"; export interface IKubeWatchEvent { @@ -106,8 +107,6 @@ class WatchRoute extends LensApi { public async routeWatch(request: LensApiRequest) { const { response, cluster, payload: { apis } = {} } = request; - const watchers = new Map(); - let isWatchRequestEnded = false; if (!apis?.length) { this.respondJson(response, { @@ -123,52 +122,38 @@ class WatchRoute extends LensApi { response.setHeader("Connection", "keep-alive"); logger.debug(`watch using kubeconfig:${JSON.stringify(cluster.getProxyKubeconfig(), null, 2)}`); - // create watcher instances + // limit concurrent k8s requests to avoid possible ECONNRESET-error + const requests = plimit(5); + const watchers = new Map(); + let isWatchRequestEnded = false; + apis.forEach(apiUrl => { const watcher = new ApiWatcher(apiUrl, cluster.getProxyKubeconfig(), response); - watchers.set(apiUrl, watcher); + + requests(async () => { + if (isWatchRequestEnded) return; + await watcher.start(); + await delay(100); + }); }); - // limit concurrent k8s requests to avoid possible ECONNRESET-error - async function startWatches() { - let apiGroupCall: () => Promise; - const apiGroupCalls = chunk(apis, 10).map(apis => { - return () => { - const startedWatches = apis.map(apiUrl => watchers.get(apiUrl).start()); - - return Promise.allSettled(startedWatches); - }; - }); - - while ((apiGroupCall = apiGroupCalls.shift())) { - try { - if (isWatchRequestEnded) break; - await apiGroupCall(); - await new Promise(resolve => setTimeout(resolve, 150)); // delay between watch group calls - } catch (error) { - logger.error(error); - } - } - } - - function endWatches() { + function onRequestEnd() { if (isWatchRequestEnded) return; isWatchRequestEnded = true; + requests.clearQueue(); watchers.forEach(watcher => watcher.stop()); watchers.clear(); } - startWatches(); - request.raw.req.on("end", () => { logger.info("Watch request end"); - endWatches(); + onRequestEnd(); }); request.raw.req.on("close", () => { logger.info("Watch request close"); - endWatches(); + onRequestEnd(); }); } }