mirror of
https://github.com/lensapp/lens.git
synced 2025-05-20 05:10:56 +00:00
Fix KubeApi.watch leak on abort (#3629)
* fix KubeApi.watch leak on abort Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com> * make linter happy Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>
This commit is contained in:
parent
90623ed189
commit
016d012439
@ -93,12 +93,6 @@ export class JsonApi<D = JsonApiData, P extends JsonApiParams = JsonApiParams> {
|
||||
reqUrl += (reqUrl.includes("?") ? "&" : "?") + queryString;
|
||||
}
|
||||
|
||||
this.writeLog({
|
||||
method: reqInit.method.toUpperCase(),
|
||||
reqUrl,
|
||||
reqInit,
|
||||
});
|
||||
|
||||
return fetch(reqUrl, reqInit);
|
||||
}
|
||||
|
||||
|
||||
@ -131,6 +131,8 @@ export type KubeApiWatchOptions = {
|
||||
namespace: string;
|
||||
callback?: KubeApiWatchCallback;
|
||||
abortController?: AbortController
|
||||
watchId?: string;
|
||||
retry?: boolean;
|
||||
};
|
||||
|
||||
export class KubeApi<T extends KubeObject> {
|
||||
@ -147,6 +149,7 @@ export class KubeApi<T extends KubeObject> {
|
||||
protected request: KubeJsonApi;
|
||||
protected resourceVersions = new Map<string, string>();
|
||||
protected watchDisposer: () => void;
|
||||
private watchId = 1;
|
||||
|
||||
constructor(protected options: IKubeApiOptions<T>) {
|
||||
const {
|
||||
@ -418,31 +421,40 @@ export class KubeApi<T extends KubeObject> {
|
||||
});
|
||||
}
|
||||
|
||||
watch(opts: KubeApiWatchOptions = { namespace: "" }): () => void {
|
||||
watch(opts: KubeApiWatchOptions = { namespace: "", retry: false }): () => void {
|
||||
let errorReceived = false;
|
||||
let timedRetry: NodeJS.Timeout;
|
||||
const { abortController: { abort, signal } = new AbortController(), namespace, callback = noop } = opts;
|
||||
const { abortController: { abort, signal } = new AbortController(), namespace, callback = noop, retry } = opts;
|
||||
const { watchId = `${this.kind.toLowerCase()}-${this.watchId++}` } = opts;
|
||||
|
||||
signal.addEventListener("abort", () => {
|
||||
logger.info(`[KUBE-API] watch (${watchId}) aborted ${watchUrl}`);
|
||||
clearTimeout(timedRetry);
|
||||
});
|
||||
|
||||
const watchUrl = this.getWatchUrl(namespace);
|
||||
const responsePromise = this.request.getResponse(watchUrl, null, { signal, timeout: 600_000 });
|
||||
|
||||
logger.info(`[KUBE-API] watch (${watchId}) ${retry === true ? "retried" : "started"} ${watchUrl}`);
|
||||
|
||||
responsePromise
|
||||
.then(response => {
|
||||
if (!response.ok) {
|
||||
logger.warn(`[KUBE-API] watch (${watchId}) error response ${watchUrl}`, { status: response.status });
|
||||
|
||||
return callback(null, response);
|
||||
}
|
||||
|
||||
["end", "close", "error"].forEach((eventName) => {
|
||||
response.body.on(eventName, () => {
|
||||
if (errorReceived) return; // kubernetes errors should be handled in a callback
|
||||
if (signal.aborted) return;
|
||||
|
||||
logger.info(`[KUBE-API] watch (${watchId}) ${eventName} ${watchUrl}`);
|
||||
|
||||
clearTimeout(timedRetry);
|
||||
timedRetry = setTimeout(() => { // we did not get any kubernetes errors so let's retry
|
||||
this.watch({ ...opts, namespace, callback });
|
||||
this.watch({ ...opts, namespace, callback, watchId, retry: true });
|
||||
}, 1000);
|
||||
});
|
||||
});
|
||||
@ -465,7 +477,7 @@ export class KubeApi<T extends KubeObject> {
|
||||
});
|
||||
})
|
||||
.catch(error => {
|
||||
if (error?.type === "aborted") return; // AbortController rejects, we can ignore it
|
||||
logger.error(`[KUBE-API] watch (${watchId}) throwed ${watchUrl}`, error);
|
||||
|
||||
callback(null, error);
|
||||
});
|
||||
|
||||
Loading…
Reference in New Issue
Block a user