From f91ce926660d1f8921684e6c24908c4de6606c8a Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 2 Feb 2021 18:39:38 +0200 Subject: [PATCH] subscribe for watching resources in single request when has admin-like access rights Signed-off-by: Roman --- src/main/cluster.ts | 40 +++++++++++++--- src/renderer/api/kube-watch-api.ts | 73 +++++++++++++++++++++--------- 2 files changed, 85 insertions(+), 28 deletions(-) diff --git a/src/main/cluster.ts b/src/main/cluster.ts index 956164e10c..4a20e585fe 100644 --- a/src/main/cluster.ts +++ b/src/main/cluster.ts @@ -48,6 +48,7 @@ export interface ClusterState { isAdmin: boolean; allowedNamespaces: string[] allowedResources: string[] + watchApiAllEnabled: boolean; } /** @@ -91,7 +92,6 @@ export class Cluster implements ClusterModel, ClusterState { */ @observable initializing = false; - /** * Is cluster object initialized * @@ -177,6 +177,12 @@ export class Cluster implements ClusterModel, ClusterState { * @observable */ @observable isAdmin = false; + /** + * Does watch-api could be used for all resources, e.g. "/api/v1/services?watch=1" + * + * @observable + */ + @observable watchApiAllEnabled = false; /** * Preferences * @@ -353,9 +359,7 @@ export class Cluster implements ClusterModel, ClusterState { await this.refreshConnectionStatus(); if (this.accessible) { - await this.refreshAllowedResources(); - this.isAdmin = await this.isClusterAdmin(); - this.ready = true; + await this.refreshAccessibility(); this.ensureKubectl(); } this.activated = true; @@ -410,13 +414,11 @@ export class Cluster implements ClusterModel, ClusterState { await this.refreshConnectionStatus(); if (this.accessible) { - this.isAdmin = await this.isClusterAdmin(); - await this.refreshAllowedResources(); + await this.refreshAccessibility(); if (opts.refreshMetadata) { this.refreshMetadata(); } - this.ready = true; } this.pushState(); } @@ -433,6 +435,18 @@ export class Cluster implements ClusterModel, ClusterState { this.metadata = Object.assign(existingMetadata, metadata); } + /** + * @internal + */ + async refreshAccessibility(): Promise { + this.isAdmin = await this.isClusterAdmin(); + this.watchApiAllEnabled = await this.canUseWatchApi({ resource: "*" }); + + await this.refreshAllowedResources(); + + this.ready = true; + } + /** * @internal */ @@ -571,6 +585,17 @@ export class Cluster implements ClusterModel, ClusterState { }); } + /** + * @internal + */ + async canUseWatchApi(customizeResource: V1ResourceAttributes = {}): Promise { + return this.canI({ + verb: "watch", + resource: "*", + ...customizeResource, + }); + } + toJSON(): ClusterModel { const model: ClusterModel = { id: this.id, @@ -604,6 +629,7 @@ export class Cluster implements ClusterModel, ClusterState { isAdmin: this.isAdmin, allowedNamespaces: this.allowedNamespaces, allowedResources: this.allowedResources, + watchApiAllEnabled: this.watchApiAllEnabled, }; return toJS(state, { diff --git a/src/renderer/api/kube-watch-api.ts b/src/renderer/api/kube-watch-api.ts index 849f93f56c..e874a3d0af 100644 --- a/src/renderer/api/kube-watch-api.ts +++ b/src/renderer/api/kube-watch-api.ts @@ -8,8 +8,8 @@ import type { KubeObjectStore } from "../kube-object.store"; import plimit from "p-limit"; import debounce from "lodash/debounce"; -import { autorun, comparer, computed, observable, reaction } from "mobx"; -import { autobind, EventEmitter } from "../utils"; +import { autorun, comparer, computed, IReactionDisposer, observable, reaction } from "mobx"; +import { autobind, EventEmitter, noop } from "../utils"; import { ensureObjectSelfLink, KubeApi, parseKubeApi } from "./kube-api"; import { KubeJsonApiData, KubeJsonApiError } from "./kube-json-api"; import { apiPrefix, isDebugging, isProduction } from "../../common/vars"; @@ -18,6 +18,7 @@ import { apiManager } from "./api-manager"; export { IKubeWatchEvent, IKubeWatchEventStreamEnd }; export interface IKubeWatchMessage { + namespace?: string; data?: IKubeWatchEvent error?: IKubeWatchEvent; api?: KubeApi; @@ -27,7 +28,7 @@ export interface IKubeWatchMessage { export interface IKubeWatchSubscribeStoreOptions { preload?: boolean; // preload store items, default: true waitUntilLoaded?: boolean; // subscribe only after loading all stores, default: true - cacheLoading?: boolean; // when enabled loading store will be skipped, default: false + loadOnce?: boolean; // check store.isLoaded to skip loading if done already, default: false } export interface IKubeWatchReconnectOptions { @@ -51,6 +52,12 @@ export class KubeWatchApi { @observable subscribers = observable.map(); @observable isConnected = false; + @computed get watchAllEnabled(): boolean { + const { isAdmin, watchApiAllEnabled } = this?.cluster; + + return Boolean(isAdmin || watchApiAllEnabled); + } + @computed get isReady(): boolean { return Boolean(this.cluster && this.namespaces); } @@ -69,11 +76,11 @@ export class KubeWatchApi { return []; } - if (api.isNamespaced) { + if (api.isNamespaced && !this.watchAllEnabled) { return this.namespaces.map(namespace => api.getWatchUrl(namespace)); - } else { - return api.getWatchUrl(); } + + return api.getWatchUrl(); }).flat(); } @@ -127,36 +134,54 @@ export class KubeWatchApi { }; } - subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): () => void { - const { preload = true, waitUntilLoaded = true, cacheLoading = false } = options; + preloadStores(stores: KubeObjectStore[], { loadOnce = false } = {}) { const limitRequests = plimit(1); // load stores one by one to allow quick skipping when fast clicking btw pages const preloading: Promise[] = []; + + for (const store of stores) { + preloading.push(limitRequests(async () => { + if (store.isLoaded && loadOnce) return; // skip + + return store.loadAll(this.namespaces); + })); + } + + return { + loading: Promise.all(preloading), + cancelLoading: () => limitRequests.clearQueue(), + }; + } + + subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): () => void { + const { preload = true, waitUntilLoaded = true, loadOnce = false } = options; const apis = new Set(stores.map(store => store.getSubscribeApis()).flat()); const unsubscribeList: (() => void)[] = []; let isUnsubscribed = false; + const load = () => this.preloadStores(stores, { loadOnce }); + let preloading = preload && load(); + let cancelReloading: IReactionDisposer = noop; + const subscribe = () => { if (isUnsubscribed) return; apis.forEach(api => unsubscribeList.push(this.subscribeApi(api))); }; - if (preload) { - for (const store of stores) { - preloading.push(limitRequests(async () => { - if (cacheLoading && store.isLoaded) return; // skip - - return store.loadAll(this.namespaces); - })); - } - } - - if (waitUntilLoaded) { - Promise.all(preloading).then(subscribe, error => { + if (preloading && waitUntilLoaded) { + preloading.loading.then(subscribe, error => { this.log({ message: new Error("Loading stores has failed"), meta: { stores, error, options }, }); }); + + // reload when context namespaces changes + cancelReloading = reaction(() => this.namespaces, () => { + preloading?.cancelLoading(); + preloading = load(); + }, { + equals: comparer.shallow, + }); } else { subscribe(); } @@ -165,7 +190,8 @@ export class KubeWatchApi { return () => { if (isUnsubscribed) return; isUnsubscribed = true; - limitRequests.clearQueue(); + cancelReloading(); + preloading?.cancelLoading(); unsubscribeList.forEach(unsubscribe => unsubscribe()); }; } @@ -252,6 +278,10 @@ export class KubeWatchApi { const kubeEvent: IKubeWatchEvent = JSON.parse(json); const message = this.getMessage(kubeEvent); + if (!this.namespaces.includes(message.namespace)) { + continue; // skip updates from non-watching resources context + } + this.onMessage.emit(message); } catch (error) { return json; @@ -284,6 +314,7 @@ export class KubeWatchApi { message.api = api; message.store = apiManager.getStore(api); + message.namespace = namespace; } break; }