diff --git a/src/renderer/api/kube-watch-api.ts b/src/renderer/api/kube-watch-api.ts index 707255d2e5..d19f803ee1 100644 --- a/src/renderer/api/kube-watch-api.ts +++ b/src/renderer/api/kube-watch-api.ts @@ -6,6 +6,7 @@ import type { IKubeWatchEvent, IKubeWatchEventStreamEnd, IWatchRoutePayload } fr import type { KubeObject } from "./kube-object"; import type { KubeObjectStore } from "../kube-object.store"; +import plimit from "p-limit"; import debounce from "lodash/debounce"; import { comparer, computed, observable, reaction, when } from "mobx"; import { autobind, EventEmitter } from "../utils"; @@ -26,6 +27,7 @@ export interface IKubeWatchMessage { export interface IKubeWatchSubscribeStoreOptions { preload?: boolean; // preload store items, default: true waitUntilLoaded?: boolean; // subscribe only after loading all stores, default: true + cacheLoading?: boolean; // when enabled loading store will be skipped, default: false } export interface IKubeWatchReconnectOptions { @@ -103,10 +105,15 @@ export class KubeWatchApi { return this.subscribers.get(api) || 0; } - subscribeApi(api: KubeApi | KubeApi[]) { + isAllowedApi(api: KubeApi): boolean { + return this.cluster.isAllowedResource(api.kind); + } + + subscribeApi(api: KubeApi | KubeApi[]): () => void { const apis: KubeApi[] = [api].flat(); apis.forEach(api => { + if (!this.isAllowedApi(api)) return; // skip this.subscribers.set(api, this.getSubscribersCount(api) + 1); }); @@ -121,52 +128,63 @@ export class KubeWatchApi { } subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): () => void { - const { preload = true, waitUntilLoaded = true } = options; - const loading: Promise[] = []; - const disposers: Function[] = []; - let isDisposed = false; - - async function subscribe() { - if (isDisposed) return; - const unsubscribeList = await Promise.all(stores.map(store => store.subscribe())); - - disposers.push(...unsubscribeList); - if (isDisposed) unsubscribe(); - } - - function unsubscribe() { - isDisposed = true; - disposers.forEach(dispose => dispose()); - disposers.length = 0; - } + const { preload = true, waitUntilLoaded = true, cacheLoading = false } = options; + const limitRequests = plimit(1); // load stores one by one to allow quick skipping when fast clicking btw pages + const preloading: Promise[] = []; + const apis = new Set(stores.map(store => store.getSubscribeApis()).flat()); + let isUnsubscribed = false; if (preload) { - loading.push(...stores.map(store => store.loadAll(this.getNamespaces()))); + for (const store of stores) { + preloading.push(limitRequests(async () => { + if (cacheLoading && store.isLoaded) return; // skip + + return store.loadAll(this.getNamespaces()); + })); + } } - if (waitUntilLoaded) { - Promise.all(loading).then(subscribe, error => { - this.log({ - message: new Error("Loading stores has failed"), - meta: { stores, error, options }, + const subscribe = () => { + const unsubscribeList: (() => void)[] = []; + + const subscribeApis = () => { + if (isUnsubscribed) return; + apis.forEach(api => unsubscribeList.push(this.subscribeApi(api))); + }; + + if (waitUntilLoaded) { + Promise.all(preloading).then(subscribeApis, error => { + this.log({ + message: new Error("Loading stores has failed"), + meta: { stores, error, options }, + }); }); - }); - } else { - subscribe(); - } + } else { + subscribeApis(); + } - return unsubscribe; + // unsubscribe + return () => { + if (isUnsubscribed) return; + isUnsubscribed = true; + limitRequests.clearQueue(); + unsubscribeList.forEach(unsubscribe => unsubscribe()); + }; + }; + + return subscribe(); } - protected connectionCheck() { + protected async connectionCheck() { + if (!this.isConnected) { + this.log({ message: "Offline: reconnecting.." }); + await this.connect(); + } + this.log({ - message: "connection check", + message: `Connection check: ${this.isConnected ? "online" : "offline"}`, meta: { connected: this.isConnected }, }); - - if (this.isConnected) return; - - return this.connect(); } protected async connect(apis = this.apis) { diff --git a/src/renderer/item.store.ts b/src/renderer/item.store.ts index e09c65d43d..eccd2b52df 100644 --- a/src/renderer/item.store.ts +++ b/src/renderer/item.store.ts @@ -167,7 +167,7 @@ export abstract class ItemStore { async removeSelectedItems?(): Promise; // eslint-disable-next-line unused-imports/no-unused-vars-ts - async subscribe(...args: any[]): Promise<() => void> { + subscribe(...args: any[]) { return noop; } diff --git a/src/renderer/kube-object.store.ts b/src/renderer/kube-object.store.ts index 1a8710172d..d56e6bd912 100644 --- a/src/renderer/kube-object.store.ts +++ b/src/renderer/kube-object.store.ts @@ -218,11 +218,8 @@ export abstract class KubeObjectStore extends ItemSt return [this.api]; } - async subscribe(apis = this.getSubscribeApis()): Promise<() => void> { - const cluster = await this.resolveCluster(); - const allowedApis = apis.filter(api => cluster.isAllowedResource(api.kind)); - - return kubeWatchApi.subscribeApi(allowedApis); + subscribe(apis = this.getSubscribeApis()) { + return kubeWatchApi.subscribeApi(apis); } @action