From 082885cabf6eebf9ba93ab15208ef59a470e640b Mon Sep 17 00:00:00 2001 From: Sebastian Malton Date: Thu, 29 Apr 2021 08:32:29 -0400 Subject: [PATCH] Fix KubeObjectStore not correctly tracking loading of namespaces (#2266) --- src/common/utils/index.ts | 2 + src/common/utils/reject-promise.ts | 13 ++++ src/renderer/api/kube-api.ts | 5 ++ src/renderer/kube-object.store.ts | 107 ++++++++++++++--------------- 4 files changed, 73 insertions(+), 54 deletions(-) create mode 100644 src/common/utils/reject-promise.ts diff --git a/src/common/utils/index.ts b/src/common/utils/index.ts index edb2b44d96..9fb0477fe4 100644 --- a/src/common/utils/index.ts +++ b/src/common/utils/index.ts @@ -11,10 +11,12 @@ export * from "./debouncePromise"; export * from "./defineGlobal"; export * from "./delay"; export * from "./disposer"; +export * from "./disposer"; export * from "./downloadFile"; export * from "./escapeRegExp"; export * from "./getRandId"; export * from "./openExternal"; +export * from "./reject-promise"; export * from "./saveToAppFiles"; export * from "./singleton"; export * from "./splitArray"; diff --git a/src/common/utils/reject-promise.ts b/src/common/utils/reject-promise.ts new file mode 100644 index 0000000000..a263ce4489 --- /dev/null +++ b/src/common/utils/reject-promise.ts @@ -0,0 +1,13 @@ +import "abort-controller/polyfill"; + +/** + * Creates a new promise that will be rejected when the signal rejects. + * + * Useful for `Promise.race()` applications. + * @param signal The AbortController's signal to reject with + */ +export function rejectPromiseBy(signal: AbortSignal): Promise { + return new Promise((_, reject) => { + signal.addEventListener("abort", reject); + }); +} diff --git a/src/renderer/api/kube-api.ts b/src/renderer/api/kube-api.ts index 2c0739c6eb..8b0a9421d8 100644 --- a/src/renderer/api/kube-api.ts +++ b/src/renderer/api/kube-api.ts @@ -50,6 +50,11 @@ export interface IKubeApiQueryParams { fieldSelector?: string | string[]; // restrict list of objects by their fields, e.g. fieldSelector: "field=name" } +export interface KubeApiListOptions { + namespace?: string; + reqInit?: RequestInit; +} + export interface IKubePreferredVersion { preferredVersion?: { version: string; diff --git a/src/renderer/kube-object.store.ts b/src/renderer/kube-object.store.ts index ae1517d79f..35f052d03f 100644 --- a/src/renderer/kube-object.store.ts +++ b/src/renderer/kube-object.store.ts @@ -1,17 +1,19 @@ import type { ClusterContext } from "./components/context"; import { action, computed, observable, reaction, when } from "mobx"; -import { autobind } from "./utils"; +import { autobind, noop, rejectPromiseBy } from "./utils"; import { KubeObject, KubeStatus } from "./api/kube-object"; import { IKubeWatchEvent } from "./api/kube-watch-api"; import { ItemStore } from "./item.store"; import { apiManager } from "./api/api-manager"; import { IKubeApiQueryParams, KubeApi, parseKubeApi } from "./api/kube-api"; import { KubeJsonApiData } from "./api/kube-json-api"; +import { Notifications } from "./components/notifications"; export interface KubeObjectStoreLoadingParams { namespaces: string[]; api?: KubeApi; + reqInit?: RequestInit; } @autobind() @@ -21,9 +23,10 @@ export abstract class KubeObjectStore extends ItemSt abstract api: KubeApi; public readonly limit?: number; public readonly bufferSize: number = 50000; - private loadedNamespaces: string[] = []; + @observable private loadedNamespaces?: string[]; contextReady = when(() => Boolean(this.context)); + namespacesReady = when(() => Boolean(this.loadedNamespaces)); constructor() { super(); @@ -103,10 +106,10 @@ export abstract class KubeObjectStore extends ItemSt } } - protected async loadItems({ namespaces, api }: KubeObjectStoreLoadingParams): Promise { + protected async loadItems({ namespaces, api, reqInit }: KubeObjectStoreLoadingParams): Promise { if (this.context?.cluster.isAllowedResource(api.kind)) { if (!api.isNamespaced) { - return api.list({}, this.query); + return api.list({ reqInit }, this.query); } const isLoadingAll = this.context.allNamespaces?.length > 1 @@ -116,13 +119,13 @@ export abstract class KubeObjectStore extends ItemSt if (isLoadingAll) { this.loadedNamespaces = []; - return api.list({}, this.query); + return api.list({ reqInit }, this.query); } else { this.loadedNamespaces = namespaces; return Promise // load resources per namespace - .all(namespaces.map(namespace => api.list({ namespace }))) - .then(items => items.flat()); + .all(namespaces.map(namespace => api.list({ namespace, reqInit }))) + .then(items => items.flat().filter(Boolean)); } } @@ -134,7 +137,7 @@ export abstract class KubeObjectStore extends ItemSt } @action - async loadAll(options: { namespaces?: string[], merge?: boolean } = {}): Promise { + async loadAll(options: { namespaces?: string[], merge?: boolean, reqInit?: RequestInit } = {}): Promise { await this.contextReady; this.isLoading = true; @@ -142,9 +145,10 @@ export abstract class KubeObjectStore extends ItemSt const { namespaces = this.context.allNamespaces, // load all namespaces by default merge = true, // merge loaded items or return as result + reqInit, } = options; - const items = await this.loadItems({ namespaces, api: this.api }); + const items = await this.loadItems({ namespaces, api: this.api, reqInit }); if (merge) { this.mergeItems(items, { replace: false }); @@ -157,7 +161,10 @@ export abstract class KubeObjectStore extends ItemSt return items; } catch (error) { - console.error("Loading store items failed", { error, store: this }); + if (error.message) { + Notifications.error(error.message); + } + console.error("Loading store items failed", { error }); this.resetOnError(error); this.failedLoading = true; } finally { @@ -274,17 +281,21 @@ export abstract class KubeObjectStore extends ItemSt subscribe(apis = this.getSubscribeApis()) { const abortController = new AbortController(); - const namespaces = [...this.loadedNamespaces]; - if (this.context.cluster?.isGlobalWatchEnabled && namespaces.length === 0) { - apis.forEach(api => this.watchNamespace(api, "", abortController)); - } else { - apis.forEach(api => { - this.loadedNamespaces.forEach((namespace) => { - this.watchNamespace(api, namespace, abortController); - }); - }); - } + // This waits for the context and namespaces to be ready or fails fast if the disposer is called + Promise.race([rejectPromiseBy(abortController.signal), Promise.all([this.contextReady, this.namespacesReady])]) + .then(() => { + if (this.context.cluster.isGlobalWatchEnabled && this.loadedNamespaces.length === 0) { + apis.forEach(api => this.watchNamespace(api, "", abortController)); + } else { + apis.forEach(api => { + this.loadedNamespaces.forEach((namespace) => { + this.watchNamespace(api, namespace, abortController); + }); + }); + } + }) + .catch(noop); // ignore DOMExceptions return () => { abortController.abort(); @@ -293,48 +304,39 @@ export abstract class KubeObjectStore extends ItemSt private watchNamespace(api: KubeApi, namespace: string, abortController: AbortController) { let timedRetry: NodeJS.Timeout; + const watch = () => api.watch({ + namespace, + abortController, + callback + }); - abortController.signal.addEventListener("abort", () => clearTimeout(timedRetry)); + const { signal } = abortController; const callback = (data: IKubeWatchEvent, error: any) => { - if (!this.isLoaded || abortController.signal.aborted) return; + if (!this.isLoaded || error instanceof DOMException) return; if (error instanceof Response) { if (error.status === 404) { // api has gone, let's not retry return; - } else { // not sure what to do, best to retry - if (timedRetry) clearTimeout(timedRetry); - timedRetry = setTimeout(() => { - api.watch({ - namespace, - abortController, - callback - }); - }, 5000); } + + // not sure what to do, best to retry + clearTimeout(timedRetry); + timedRetry = setTimeout(watch, 5000); } else if (error instanceof KubeStatus && error.code === 410) { - if (timedRetry) clearTimeout(timedRetry); + clearTimeout(timedRetry); // resourceVersion has gone, let's try to reload timedRetry = setTimeout(() => { - (namespace === "" ? this.loadAll({ merge: false }) : this.loadAll({namespaces: [namespace]})).then(() => { - api.watch({ - namespace, - abortController, - callback - }); - }); + ( + namespace + ? this.loadAll({ namespaces: [namespace], reqInit: { signal } }) + : this.loadAll({ merge: false, reqInit: { signal } }) + ).then(watch); }, 1000); - } else if(error) { // not sure what to do, best to retry - if (timedRetry) clearTimeout(timedRetry); - - timedRetry = setTimeout(() => { - api.watch({ - namespace, - abortController, - callback - }); - }, 5000); + } else if (error) { // not sure what to do, best to retry + clearTimeout(timedRetry); + timedRetry = setTimeout(watch, 5000); } if (data) { @@ -342,11 +344,8 @@ export abstract class KubeObjectStore extends ItemSt } }; - api.watch({ - namespace, - abortController, - callback: (data, error) => callback(data, error) - }); + signal.addEventListener("abort", () => clearTimeout(timedRetry)); + watch(); } @action