1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

Fix KubeApi.watch leak on abort (#3629)

* fix KubeApi.watch leak on abort

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* make linter happy

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>
This commit is contained in:
Jari Kolehmainen 2021-08-18 11:50:03 +03:00 committed by GitHub
parent 90623ed189
commit 016d012439
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 10 deletions

View File

@ -93,12 +93,6 @@ export class JsonApi<D = JsonApiData, P extends JsonApiParams = JsonApiParams> {
reqUrl += (reqUrl.includes("?") ? "&" : "?") + queryString;
}
this.writeLog({
method: reqInit.method.toUpperCase(),
reqUrl,
reqInit,
});
return fetch(reqUrl, reqInit);
}

View File

@ -131,6 +131,8 @@ export type KubeApiWatchOptions = {
namespace: string;
callback?: KubeApiWatchCallback;
abortController?: AbortController
watchId?: string;
retry?: boolean;
};
export class KubeApi<T extends KubeObject> {
@ -147,6 +149,7 @@ export class KubeApi<T extends KubeObject> {
protected request: KubeJsonApi;
protected resourceVersions = new Map<string, string>();
protected watchDisposer: () => void;
private watchId = 1;
constructor(protected options: IKubeApiOptions<T>) {
const {
@ -418,31 +421,40 @@ export class KubeApi<T extends KubeObject> {
});
}
watch(opts: KubeApiWatchOptions = { namespace: "" }): () => void {
watch(opts: KubeApiWatchOptions = { namespace: "", retry: false }): () => void {
let errorReceived = false;
let timedRetry: NodeJS.Timeout;
const { abortController: { abort, signal } = new AbortController(), namespace, callback = noop } = opts;
const { abortController: { abort, signal } = new AbortController(), namespace, callback = noop, retry } = opts;
const { watchId = `${this.kind.toLowerCase()}-${this.watchId++}` } = opts;
signal.addEventListener("abort", () => {
logger.info(`[KUBE-API] watch (${watchId}) aborted ${watchUrl}`);
clearTimeout(timedRetry);
});
const watchUrl = this.getWatchUrl(namespace);
const responsePromise = this.request.getResponse(watchUrl, null, { signal, timeout: 600_000 });
logger.info(`[KUBE-API] watch (${watchId}) ${retry === true ? "retried" : "started"} ${watchUrl}`);
responsePromise
.then(response => {
if (!response.ok) {
logger.warn(`[KUBE-API] watch (${watchId}) error response ${watchUrl}`, { status: response.status });
return callback(null, response);
}
["end", "close", "error"].forEach((eventName) => {
response.body.on(eventName, () => {
if (errorReceived) return; // kubernetes errors should be handled in a callback
if (signal.aborted) return;
logger.info(`[KUBE-API] watch (${watchId}) ${eventName} ${watchUrl}`);
clearTimeout(timedRetry);
timedRetry = setTimeout(() => { // we did not get any kubernetes errors so let's retry
this.watch({ ...opts, namespace, callback });
this.watch({ ...opts, namespace, callback, watchId, retry: true });
}, 1000);
});
});
@ -465,7 +477,7 @@ export class KubeApi<T extends KubeObject> {
});
})
.catch(error => {
if (error?.type === "aborted") return; // AbortController rejects, we can ignore it
logger.error(`[KUBE-API] watch (${watchId}) throwed ${watchUrl}`, error);
callback(null, error);
});