mirror of
https://github.com/lensapp/lens.git
synced 2025-05-20 05:10:56 +00:00
Fix KubeApi.watch leak on abort (#3629)
* fix KubeApi.watch leak on abort Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com> * make linter happy Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>
This commit is contained in:
parent
90623ed189
commit
016d012439
@ -93,12 +93,6 @@ export class JsonApi<D = JsonApiData, P extends JsonApiParams = JsonApiParams> {
|
|||||||
reqUrl += (reqUrl.includes("?") ? "&" : "?") + queryString;
|
reqUrl += (reqUrl.includes("?") ? "&" : "?") + queryString;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.writeLog({
|
|
||||||
method: reqInit.method.toUpperCase(),
|
|
||||||
reqUrl,
|
|
||||||
reqInit,
|
|
||||||
});
|
|
||||||
|
|
||||||
return fetch(reqUrl, reqInit);
|
return fetch(reqUrl, reqInit);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -131,6 +131,8 @@ export type KubeApiWatchOptions = {
|
|||||||
namespace: string;
|
namespace: string;
|
||||||
callback?: KubeApiWatchCallback;
|
callback?: KubeApiWatchCallback;
|
||||||
abortController?: AbortController
|
abortController?: AbortController
|
||||||
|
watchId?: string;
|
||||||
|
retry?: boolean;
|
||||||
};
|
};
|
||||||
|
|
||||||
export class KubeApi<T extends KubeObject> {
|
export class KubeApi<T extends KubeObject> {
|
||||||
@ -147,6 +149,7 @@ export class KubeApi<T extends KubeObject> {
|
|||||||
protected request: KubeJsonApi;
|
protected request: KubeJsonApi;
|
||||||
protected resourceVersions = new Map<string, string>();
|
protected resourceVersions = new Map<string, string>();
|
||||||
protected watchDisposer: () => void;
|
protected watchDisposer: () => void;
|
||||||
|
private watchId = 1;
|
||||||
|
|
||||||
constructor(protected options: IKubeApiOptions<T>) {
|
constructor(protected options: IKubeApiOptions<T>) {
|
||||||
const {
|
const {
|
||||||
@ -418,31 +421,40 @@ export class KubeApi<T extends KubeObject> {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
watch(opts: KubeApiWatchOptions = { namespace: "" }): () => void {
|
watch(opts: KubeApiWatchOptions = { namespace: "", retry: false }): () => void {
|
||||||
let errorReceived = false;
|
let errorReceived = false;
|
||||||
let timedRetry: NodeJS.Timeout;
|
let timedRetry: NodeJS.Timeout;
|
||||||
const { abortController: { abort, signal } = new AbortController(), namespace, callback = noop } = opts;
|
const { abortController: { abort, signal } = new AbortController(), namespace, callback = noop, retry } = opts;
|
||||||
|
const { watchId = `${this.kind.toLowerCase()}-${this.watchId++}` } = opts;
|
||||||
|
|
||||||
signal.addEventListener("abort", () => {
|
signal.addEventListener("abort", () => {
|
||||||
|
logger.info(`[KUBE-API] watch (${watchId}) aborted ${watchUrl}`);
|
||||||
clearTimeout(timedRetry);
|
clearTimeout(timedRetry);
|
||||||
});
|
});
|
||||||
|
|
||||||
const watchUrl = this.getWatchUrl(namespace);
|
const watchUrl = this.getWatchUrl(namespace);
|
||||||
const responsePromise = this.request.getResponse(watchUrl, null, { signal, timeout: 600_000 });
|
const responsePromise = this.request.getResponse(watchUrl, null, { signal, timeout: 600_000 });
|
||||||
|
|
||||||
|
logger.info(`[KUBE-API] watch (${watchId}) ${retry === true ? "retried" : "started"} ${watchUrl}`);
|
||||||
|
|
||||||
responsePromise
|
responsePromise
|
||||||
.then(response => {
|
.then(response => {
|
||||||
if (!response.ok) {
|
if (!response.ok) {
|
||||||
|
logger.warn(`[KUBE-API] watch (${watchId}) error response ${watchUrl}`, { status: response.status });
|
||||||
|
|
||||||
return callback(null, response);
|
return callback(null, response);
|
||||||
}
|
}
|
||||||
|
|
||||||
["end", "close", "error"].forEach((eventName) => {
|
["end", "close", "error"].forEach((eventName) => {
|
||||||
response.body.on(eventName, () => {
|
response.body.on(eventName, () => {
|
||||||
if (errorReceived) return; // kubernetes errors should be handled in a callback
|
if (errorReceived) return; // kubernetes errors should be handled in a callback
|
||||||
|
if (signal.aborted) return;
|
||||||
|
|
||||||
|
logger.info(`[KUBE-API] watch (${watchId}) ${eventName} ${watchUrl}`);
|
||||||
|
|
||||||
clearTimeout(timedRetry);
|
clearTimeout(timedRetry);
|
||||||
timedRetry = setTimeout(() => { // we did not get any kubernetes errors so let's retry
|
timedRetry = setTimeout(() => { // we did not get any kubernetes errors so let's retry
|
||||||
this.watch({ ...opts, namespace, callback });
|
this.watch({ ...opts, namespace, callback, watchId, retry: true });
|
||||||
}, 1000);
|
}, 1000);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@ -465,7 +477,7 @@ export class KubeApi<T extends KubeObject> {
|
|||||||
});
|
});
|
||||||
})
|
})
|
||||||
.catch(error => {
|
.catch(error => {
|
||||||
if (error?.type === "aborted") return; // AbortController rejects, we can ignore it
|
logger.error(`[KUBE-API] watch (${watchId}) throwed ${watchUrl}`, error);
|
||||||
|
|
||||||
callback(null, error);
|
callback(null, error);
|
||||||
});
|
});
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user