diff --git a/src/main/routes/watch-route.ts b/src/main/routes/watch-route.ts index 9174301ee5..7141337241 100644 --- a/src/main/routes/watch-route.ts +++ b/src/main/routes/watch-route.ts @@ -5,6 +5,7 @@ import { LensApi } from "../lens-api"; import { KubeConfig, Watch } from "@kubernetes/client-node"; import { ServerResponse } from "http"; import { Request } from "request"; +import { chunk } from "lodash"; import logger from "../logger"; export interface IKubeWatchEvent { @@ -36,13 +37,12 @@ class ApiWatcher { this.response = response; } - // FIXME: add delay to kube-watch-api requests to avoid possible ECONNRESET error - // https://stackoverflow.com/questions/17245881/how-do-i-debug-error-econnreset-in-node-js public async start() { if (this.processor) { clearInterval(this.processor); } this.processor = setInterval(() => { + if (this.response.finished) return; const events = this.eventBuffer.splice(0); events.map(event => this.sendEvent(event)); @@ -95,12 +95,21 @@ class ApiWatcher { } class WatchRoute extends LensApi { + private response: ServerResponse; + + private setResponse(response: ServerResponse) { + // clean up previous connection and stop all corresponding watch-api requests + // otherwise it happens only by request timeout or something else.. + this.response?.destroy(); + this.response = response; + } public async routeWatch(request: LensApiRequest) { - const { response, cluster, payload } = request; - const watchers: ApiWatcher[] = []; + const { response, cluster, payload: { apis } = {} } = request; + const watchers = new Map(); + let isWatchRequestEnded = false; - if (!payload?.apis?.length) { + if (!apis?.length) { this.respondJson(response, { message: "watch apis list is empty" }, 400); @@ -108,26 +117,58 @@ class WatchRoute extends LensApi { return; } + this.setResponse(response); response.setHeader("Content-Type", "application/json"); response.setHeader("Cache-Control", "no-cache"); response.setHeader("Connection", "keep-alive"); logger.debug(`watch using kubeconfig:${JSON.stringify(cluster.getProxyKubeconfig(), null, 2)}`); - payload.apis.forEach(apiUrl => { + // create watcher instances + apis.forEach(apiUrl => { const watcher = new ApiWatcher(apiUrl, cluster.getProxyKubeconfig(), response); - watcher.start(); - watchers.push(watcher); + watchers.set(apiUrl, watcher); + }); + + // limit concurrent k8s requests to avoid possible ECONNRESET-error + async function startWatches() { + let apiGroupCall: () => Promise; + const apiGroupCalls = chunk(apis, 10).map(apis => { + return () => { + const startedWatches = apis.map(apiUrl => watchers.get(apiUrl).start()); + + return Promise.allSettled(startedWatches); + }; + }); + + while ((apiGroupCall = apiGroupCalls.shift())) { + try { + if (isWatchRequestEnded) break; + await apiGroupCall(); + await new Promise(resolve => setTimeout(resolve, 150)); // delay between watch group calls + } catch (error) { + logger.error(error); + } + } + } + + function endWatches() { + if (isWatchRequestEnded) return; + isWatchRequestEnded = true; + watchers.forEach(watcher => watcher.stop()); + watchers.clear(); + } + + startWatches(); + + request.raw.req.on("end", () => { + logger.info("Watch request end"); + endWatches(); }); request.raw.req.on("close", () => { - logger.info("Watch request closed"); - watchers.map(watcher => watcher.stop()); - }); - - request.raw.req.on("end", () => { - logger.info("Watch request ended"); - watchers.map(watcher => watcher.stop()); + logger.info("Watch request close"); + endWatches(); }); } } diff --git a/src/renderer/api/kube-watch-api.ts b/src/renderer/api/kube-watch-api.ts index f152e7312a..edf1ea89e6 100644 --- a/src/renderer/api/kube-watch-api.ts +++ b/src/renderer/api/kube-watch-api.ts @@ -31,7 +31,7 @@ export interface IKubeWatchSubscribeStoreOptions { export interface IKubeWatchLog { message: string | Error; - meta?: object | any; + meta?: object; } @autobind() @@ -306,7 +306,7 @@ export class KubeWatchApi { } } - protected log({ message, meta }: IKubeWatchLog) { + protected log({ message, meta = {} }: IKubeWatchLog) { if (isProduction) { return; } @@ -314,11 +314,12 @@ export class KubeWatchApi { const logMessage = `%c[KUBE-WATCH-API]: ${String(message).toUpperCase()}`; const isError = message instanceof Error; const textStyle = `font-weight: bold; ${isError ? "color: red;" : ""}`; + const time = new Date().toLocaleString(); if (isError) { - console.error(logMessage, textStyle, meta); + console.error(logMessage, textStyle, { time, ...meta }); } else { - console.info(logMessage, textStyle, meta); + console.info(logMessage, textStyle, { time, ...meta }); } } }