From bbf9b7af3200f81ceaabe545a0f06ce3eacb72a7 Mon Sep 17 00:00:00 2001 From: Roman Date: Thu, 28 Jan 2021 17:34:52 +0200 Subject: [PATCH] kubeWatchApi refactoring / fine-tuning Signed-off-by: Roman --- src/renderer/api/kube-watch-api.ts | 90 ++++++++++++++++++------------ src/renderer/item.store.ts | 2 +- src/renderer/kube-object.store.ts | 7 +-- 3 files changed, 57 insertions(+), 42 deletions(-) diff --git a/src/renderer/api/kube-watch-api.ts b/src/renderer/api/kube-watch-api.ts index ea451b4bf7..5947468010 100644 --- a/src/renderer/api/kube-watch-api.ts +++ b/src/renderer/api/kube-watch-api.ts @@ -7,6 +7,7 @@ import type { KubeObject } from "./kube-object"; import type { KubeObjectStore } from "../kube-object.store"; import type { NamespaceStore } from "../components/+namespaces/namespace.store"; +import plimit from "p-limit"; import debounce from "lodash/debounce"; import { comparer, computed, observable, reaction } from "mobx"; import { autobind, EventEmitter } from "../utils"; @@ -27,6 +28,7 @@ export interface IKubeWatchMessage { export interface IKubeWatchSubscribeStoreOptions { preload?: boolean; // preload store items, default: true waitUntilLoaded?: boolean; // subscribe only after loading all stores, default: true + cacheLoading?: boolean; // when enabled loading store will be skipped, default: false } export interface IKubeWatchReconnectOptions { @@ -105,10 +107,15 @@ export class KubeWatchApi { return this.subscribers.get(api) || 0; } - subscribeApi(api: KubeApi | KubeApi[]) { + isAllowedApi(api: KubeApi): boolean { + return this.cluster.isAllowedResource(api.kind); + } + + subscribeApi(api: KubeApi | KubeApi[]): () => void { const apis: KubeApi[] = [api].flat(); apis.forEach(api => { + if (!this.isAllowedApi(api)) return; // skip this.subscribers.set(api, this.getSubscribersCount(api) + 1); }); @@ -123,52 +130,63 @@ export class KubeWatchApi { } subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): () => void { - const { preload = true, waitUntilLoaded = true } = options; - const loading: Promise[] = []; - const disposers: Function[] = []; - let isDisposed = false; - - async function subscribe() { - if (isDisposed) return; - const unsubscribeList = await Promise.all(stores.map(store => store.subscribe())); - - disposers.push(...unsubscribeList); - if (isDisposed) unsubscribe(); - } - - function unsubscribe() { - isDisposed = true; - disposers.forEach(dispose => dispose()); - disposers.length = 0; - } + const { preload = true, waitUntilLoaded = true, cacheLoading = false } = options; + const limitRequests = plimit(1); // load stores one by one to allow quick skipping when fast clicking btw pages + const preloading: Promise[] = []; + const apis = new Set(stores.map(store => store.getSubscribeApis()).flat()); + let isUnsubscribed = false; if (preload) { - loading.push(...stores.map(store => store.loadAll())); + for (const store of stores) { + preloading.push(limitRequests(async () => { + if (cacheLoading && store.isLoaded) return; // skip + + return store.loadAll(); + })); + } } - if (waitUntilLoaded) { - Promise.all(loading).then(subscribe, error => { - this.log({ - message: new Error("Loading stores has failed"), - meta: { stores, error, options }, + const subscribe = () => { + const unsubscribeList: (() => void)[] = []; + + const subscribeApis = () => { + if (isUnsubscribed) return; + apis.forEach(api => unsubscribeList.push(this.subscribeApi(api))); + }; + + if (waitUntilLoaded) { + Promise.all(preloading).then(subscribeApis, error => { + this.log({ + message: new Error("Loading stores has failed"), + meta: { stores, error, options }, + }); }); - }); - } else { - subscribe(); - } + } else { + subscribeApis(); + } - return unsubscribe; + // unsubscribe + return () => { + if (isUnsubscribed) return; + isUnsubscribed = true; + limitRequests.clearQueue(); + unsubscribeList.forEach(unsubscribe => unsubscribe()); + }; + }; + + return subscribe(); } - protected connectionCheck() { + protected async connectionCheck() { + if (!this.isConnected) { + this.log({ message: "Offline: reconnecting.." }); + await this.connect(); + } + this.log({ - message: "connection check", + message: `Connection check: ${this.isConnected ? "online" : "offline"}`, meta: { connected: this.isConnected }, }); - - if (this.isConnected) return; - - return this.connect(); } protected async connect(apis = this.apis) { diff --git a/src/renderer/item.store.ts b/src/renderer/item.store.ts index e09c65d43d..eccd2b52df 100644 --- a/src/renderer/item.store.ts +++ b/src/renderer/item.store.ts @@ -167,7 +167,7 @@ export abstract class ItemStore { async removeSelectedItems?(): Promise; // eslint-disable-next-line unused-imports/no-unused-vars-ts - async subscribe(...args: any[]): Promise<() => void> { + subscribe(...args: any[]) { return noop; } diff --git a/src/renderer/kube-object.store.ts b/src/renderer/kube-object.store.ts index d23626a926..760ebd3335 100644 --- a/src/renderer/kube-object.store.ts +++ b/src/renderer/kube-object.store.ts @@ -211,11 +211,8 @@ export abstract class KubeObjectStore extends ItemSt return [this.api]; } - async subscribe(apis = this.getSubscribeApis()): Promise<() => void> { - const cluster = await this.resolveCluster(); - const allowedApis = apis.filter(api => cluster.isAllowedResource(api.kind)); - - return kubeWatchApi.subscribeApi(allowedApis); + subscribe(apis = this.getSubscribeApis()) { + return kubeWatchApi.subscribeApi(apis); } @action