1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

use plimit + delay for k8s watch requests

Signed-off-by: Roman <ixrock@gmail.com>
This commit is contained in:
Roman 2021-01-26 15:22:11 +02:00
parent 03794e6c38
commit b1fac0d727
3 changed files with 24 additions and 32 deletions

View File

@ -0,0 +1,6 @@
// Create async delay for provided timeout in milliseconds
export async function delay(timeoutMs = 1000) {
if (!timeoutMs) return;
await new Promise(resolve => setTimeout(resolve, timeoutMs));
}

View File

@ -7,6 +7,7 @@ export * from "./autobind";
export * from "./base64";
export * from "./camelCase";
export * from "./cloneJson";
export * from "./delay";
export * from "./debouncePromise";
export * from "./defineGlobal";
export * from "./getRandId";

View File

@ -1,11 +1,12 @@
import type { KubeJsonApiData, KubeJsonApiError } from "../../renderer/api/kube-json-api";
import plimit from "p-limit";
import { delay } from "../../common/utils";
import { LensApiRequest } from "../router";
import { LensApi } from "../lens-api";
import { KubeConfig, Watch } from "@kubernetes/client-node";
import { ServerResponse } from "http";
import { Request } from "request";
import { chunk } from "lodash";
import logger from "../logger";
export interface IKubeWatchEvent<T = KubeJsonApiData | KubeJsonApiError> {
@ -106,8 +107,6 @@ class WatchRoute extends LensApi {
public async routeWatch(request: LensApiRequest<IWatchRoutePayload>) {
const { response, cluster, payload: { apis } = {} } = request;
const watchers = new Map<string, ApiWatcher>();
let isWatchRequestEnded = false;
if (!apis?.length) {
this.respondJson(response, {
@ -123,52 +122,38 @@ class WatchRoute extends LensApi {
response.setHeader("Connection", "keep-alive");
logger.debug(`watch using kubeconfig:${JSON.stringify(cluster.getProxyKubeconfig(), null, 2)}`);
// create watcher instances
// limit concurrent k8s requests to avoid possible ECONNRESET-error
const requests = plimit(5);
const watchers = new Map<string, ApiWatcher>();
let isWatchRequestEnded = false;
apis.forEach(apiUrl => {
const watcher = new ApiWatcher(apiUrl, cluster.getProxyKubeconfig(), response);
watchers.set(apiUrl, watcher);
requests(async () => {
if (isWatchRequestEnded) return;
await watcher.start();
await delay(100);
});
});
// limit concurrent k8s requests to avoid possible ECONNRESET-error
async function startWatches() {
let apiGroupCall: () => Promise<any>;
const apiGroupCalls = chunk(apis, 10).map(apis => {
return () => {
const startedWatches = apis.map(apiUrl => watchers.get(apiUrl).start());
return Promise.allSettled(startedWatches);
};
});
while ((apiGroupCall = apiGroupCalls.shift())) {
try {
if (isWatchRequestEnded) break;
await apiGroupCall();
await new Promise(resolve => setTimeout(resolve, 150)); // delay between watch group calls
} catch (error) {
logger.error(error);
}
}
}
function endWatches() {
function onRequestEnd() {
if (isWatchRequestEnded) return;
isWatchRequestEnded = true;
requests.clearQueue();
watchers.forEach(watcher => watcher.stop());
watchers.clear();
}
startWatches();
request.raw.req.on("end", () => {
logger.info("Watch request end");
endWatches();
onRequestEnd();
});
request.raw.req.on("close", () => {
logger.info("Watch request close");
endWatches();
onRequestEnd();
});
}
}