diff --git a/src/main/__test__/cluster.test.ts b/src/main/__test__/cluster.test.ts index b3f0442cc2..4b11a19879 100644 --- a/src/main/__test__/cluster.test.ts +++ b/src/main/__test__/cluster.test.ts @@ -126,6 +126,7 @@ describe("create clusters", () => { }; jest.spyOn(Cluster.prototype, "isClusterAdmin").mockReturnValue(Promise.resolve(true)); + jest.spyOn(Cluster.prototype, "canUseWatchApi").mockReturnValue(Promise.resolve(true)); jest.spyOn(Cluster.prototype, "canI") .mockImplementation((attr: V1ResourceAttributes): Promise => { expect(attr.namespace).toBe("default"); diff --git a/src/main/cluster.ts b/src/main/cluster.ts index 956164e10c..4921e84a20 100644 --- a/src/main/cluster.ts +++ b/src/main/cluster.ts @@ -48,6 +48,7 @@ export interface ClusterState { isAdmin: boolean; allowedNamespaces: string[] allowedResources: string[] + isGlobalWatchEnabled: boolean; } /** @@ -91,7 +92,6 @@ export class Cluster implements ClusterModel, ClusterState { */ @observable initializing = false; - /** * Is cluster object initialized * @@ -177,6 +177,12 @@ export class Cluster implements ClusterModel, ClusterState { * @observable */ @observable isAdmin = false; + /** + * Does watch-api could be used for all resources, e.g. "/api/v1/services?watch=1" + * + * @observable + */ + @observable isGlobalWatchEnabled = false; /** * Preferences * @@ -353,9 +359,7 @@ export class Cluster implements ClusterModel, ClusterState { await this.refreshConnectionStatus(); if (this.accessible) { - await this.refreshAllowedResources(); - this.isAdmin = await this.isClusterAdmin(); - this.ready = true; + await this.refreshAccessibility(); this.ensureKubectl(); } this.activated = true; @@ -410,13 +414,11 @@ export class Cluster implements ClusterModel, ClusterState { await this.refreshConnectionStatus(); if (this.accessible) { - this.isAdmin = await this.isClusterAdmin(); - await this.refreshAllowedResources(); + await this.refreshAccessibility(); if (opts.refreshMetadata) { this.refreshMetadata(); } - this.ready = true; } this.pushState(); } @@ -433,6 +435,18 @@ export class Cluster implements ClusterModel, ClusterState { this.metadata = Object.assign(existingMetadata, metadata); } + /** + * @internal + */ + async refreshAccessibility(): Promise { + this.isAdmin = await this.isClusterAdmin(); + this.isGlobalWatchEnabled = await this.canUseWatchApi({ resource: "*" }); + + await this.refreshAllowedResources(); + + this.ready = true; + } + /** * @internal */ @@ -571,6 +585,17 @@ export class Cluster implements ClusterModel, ClusterState { }); } + /** + * @internal + */ + async canUseWatchApi(customizeResource: V1ResourceAttributes = {}): Promise { + return this.canI({ + verb: "watch", + resource: "*", + ...customizeResource, + }); + } + toJSON(): ClusterModel { const model: ClusterModel = { id: this.id, @@ -604,6 +629,7 @@ export class Cluster implements ClusterModel, ClusterState { isAdmin: this.isAdmin, allowedNamespaces: this.allowedNamespaces, allowedResources: this.allowedResources, + isGlobalWatchEnabled: this.isGlobalWatchEnabled, }; return toJS(state, { diff --git a/src/renderer/api/kube-watch-api.ts b/src/renderer/api/kube-watch-api.ts index 849f93f56c..813a0f8ccd 100644 --- a/src/renderer/api/kube-watch-api.ts +++ b/src/renderer/api/kube-watch-api.ts @@ -8,8 +8,8 @@ import type { KubeObjectStore } from "../kube-object.store"; import plimit from "p-limit"; import debounce from "lodash/debounce"; -import { autorun, comparer, computed, observable, reaction } from "mobx"; -import { autobind, EventEmitter } from "../utils"; +import { autorun, comparer, computed, IReactionDisposer, observable, reaction } from "mobx"; +import { autobind, EventEmitter, noop } from "../utils"; import { ensureObjectSelfLink, KubeApi, parseKubeApi } from "./kube-api"; import { KubeJsonApiData, KubeJsonApiError } from "./kube-json-api"; import { apiPrefix, isDebugging, isProduction } from "../../common/vars"; @@ -18,6 +18,7 @@ import { apiManager } from "./api-manager"; export { IKubeWatchEvent, IKubeWatchEventStreamEnd }; export interface IKubeWatchMessage { + namespace?: string; data?: IKubeWatchEvent error?: IKubeWatchEvent; api?: KubeApi; @@ -27,7 +28,7 @@ export interface IKubeWatchMessage { export interface IKubeWatchSubscribeStoreOptions { preload?: boolean; // preload store items, default: true waitUntilLoaded?: boolean; // subscribe only after loading all stores, default: true - cacheLoading?: boolean; // when enabled loading store will be skipped, default: false + loadOnce?: boolean; // check store.isLoaded to skip loading if done already, default: false } export interface IKubeWatchReconnectOptions { @@ -69,11 +70,11 @@ export class KubeWatchApi { return []; } - if (api.isNamespaced) { + if (api.isNamespaced && !this.cluster.isGlobalWatchEnabled) { return this.namespaces.map(namespace => api.getWatchUrl(namespace)); - } else { - return api.getWatchUrl(); } + + return api.getWatchUrl(); }).flat(); } @@ -127,45 +128,66 @@ export class KubeWatchApi { }; } - subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): () => void { - const { preload = true, waitUntilLoaded = true, cacheLoading = false } = options; + preloadStores(stores: KubeObjectStore[], { loadOnce = false } = {}) { const limitRequests = plimit(1); // load stores one by one to allow quick skipping when fast clicking btw pages const preloading: Promise[] = []; + + for (const store of stores) { + preloading.push(limitRequests(async () => { + if (store.isLoaded && loadOnce) return; // skip + + return store.loadAll(this.namespaces); + })); + } + + return { + loading: Promise.all(preloading), + cancelLoading: () => limitRequests.clearQueue(), + }; + } + + subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): () => void { + const { preload = true, waitUntilLoaded = true, loadOnce = false } = options; const apis = new Set(stores.map(store => store.getSubscribeApis()).flat()); const unsubscribeList: (() => void)[] = []; let isUnsubscribed = false; + const load = () => this.preloadStores(stores, { loadOnce }); + let preloading = preload && load(); + let cancelReloading: IReactionDisposer = noop; + const subscribe = () => { if (isUnsubscribed) return; apis.forEach(api => unsubscribeList.push(this.subscribeApi(api))); }; - if (preload) { - for (const store of stores) { - preloading.push(limitRequests(async () => { - if (cacheLoading && store.isLoaded) return; // skip - - return store.loadAll(this.namespaces); - })); - } - } - - if (waitUntilLoaded) { - Promise.all(preloading).then(subscribe, error => { - this.log({ - message: new Error("Loading stores has failed"), - meta: { stores, error, options }, + if (preloading) { + if (waitUntilLoaded) { + preloading.loading.then(subscribe, error => { + this.log({ + message: new Error("Loading stores has failed"), + meta: { stores, error, options }, + }); }); + } else { + subscribe(); + } + + // reload when context namespaces changes + cancelReloading = reaction(() => this.namespaces, () => { + preloading?.cancelLoading(); + preloading = load(); + }, { + equals: comparer.shallow, }); - } else { - subscribe(); } // unsubscribe return () => { if (isUnsubscribed) return; isUnsubscribed = true; - limitRequests.clearQueue(); + cancelReloading(); + preloading?.cancelLoading(); unsubscribeList.forEach(unsubscribe => unsubscribe()); }; } @@ -252,6 +274,10 @@ export class KubeWatchApi { const kubeEvent: IKubeWatchEvent = JSON.parse(json); const message = this.getMessage(kubeEvent); + if (!this.namespaces.includes(message.namespace)) { + continue; // skip updates from non-watching resources context + } + this.onMessage.emit(message); } catch (error) { return json; @@ -284,6 +310,7 @@ export class KubeWatchApi { message.api = api; message.store = apiManager.getStore(api); + message.namespace = namespace; } break; }