1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

kubeWatchApi refactoring / fine-tuning

Signed-off-by: Roman <ixrock@gmail.com>
This commit is contained in:
Roman 2021-01-28 17:34:52 +02:00
parent 78260278c0
commit bbf9b7af32
3 changed files with 57 additions and 42 deletions

View File

@ -7,6 +7,7 @@ import type { KubeObject } from "./kube-object";
import type { KubeObjectStore } from "../kube-object.store"; import type { KubeObjectStore } from "../kube-object.store";
import type { NamespaceStore } from "../components/+namespaces/namespace.store"; import type { NamespaceStore } from "../components/+namespaces/namespace.store";
import plimit from "p-limit";
import debounce from "lodash/debounce"; import debounce from "lodash/debounce";
import { comparer, computed, observable, reaction } from "mobx"; import { comparer, computed, observable, reaction } from "mobx";
import { autobind, EventEmitter } from "../utils"; import { autobind, EventEmitter } from "../utils";
@ -27,6 +28,7 @@ export interface IKubeWatchMessage<T extends KubeObject = any> {
export interface IKubeWatchSubscribeStoreOptions { export interface IKubeWatchSubscribeStoreOptions {
preload?: boolean; // preload store items, default: true preload?: boolean; // preload store items, default: true
waitUntilLoaded?: boolean; // subscribe only after loading all stores, default: true waitUntilLoaded?: boolean; // subscribe only after loading all stores, default: true
cacheLoading?: boolean; // when enabled loading store will be skipped, default: false
} }
export interface IKubeWatchReconnectOptions { export interface IKubeWatchReconnectOptions {
@ -105,10 +107,15 @@ export class KubeWatchApi {
return this.subscribers.get(api) || 0; return this.subscribers.get(api) || 0;
} }
subscribeApi(api: KubeApi | KubeApi[]) { isAllowedApi(api: KubeApi): boolean {
return this.cluster.isAllowedResource(api.kind);
}
subscribeApi(api: KubeApi | KubeApi[]): () => void {
const apis: KubeApi[] = [api].flat(); const apis: KubeApi[] = [api].flat();
apis.forEach(api => { apis.forEach(api => {
if (!this.isAllowedApi(api)) return; // skip
this.subscribers.set(api, this.getSubscribersCount(api) + 1); this.subscribers.set(api, this.getSubscribersCount(api) + 1);
}); });
@ -123,52 +130,63 @@ export class KubeWatchApi {
} }
subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): () => void { subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): () => void {
const { preload = true, waitUntilLoaded = true } = options; const { preload = true, waitUntilLoaded = true, cacheLoading = false } = options;
const loading: Promise<any>[] = []; const limitRequests = plimit(1); // load stores one by one to allow quick skipping when fast clicking btw pages
const disposers: Function[] = []; const preloading: Promise<any>[] = [];
let isDisposed = false; const apis = new Set(stores.map(store => store.getSubscribeApis()).flat());
let isUnsubscribed = false;
async function subscribe() {
if (isDisposed) return;
const unsubscribeList = await Promise.all(stores.map(store => store.subscribe()));
disposers.push(...unsubscribeList);
if (isDisposed) unsubscribe();
}
function unsubscribe() {
isDisposed = true;
disposers.forEach(dispose => dispose());
disposers.length = 0;
}
if (preload) { if (preload) {
loading.push(...stores.map(store => store.loadAll())); for (const store of stores) {
preloading.push(limitRequests(async () => {
if (cacheLoading && store.isLoaded) return; // skip
return store.loadAll();
}));
}
} }
if (waitUntilLoaded) { const subscribe = () => {
Promise.all(loading).then(subscribe, error => { const unsubscribeList: (() => void)[] = [];
this.log({
message: new Error("Loading stores has failed"), const subscribeApis = () => {
meta: { stores, error, options }, if (isUnsubscribed) return;
apis.forEach(api => unsubscribeList.push(this.subscribeApi(api)));
};
if (waitUntilLoaded) {
Promise.all(preloading).then(subscribeApis, error => {
this.log({
message: new Error("Loading stores has failed"),
meta: { stores, error, options },
});
}); });
}); } else {
} else { subscribeApis();
subscribe(); }
}
return unsubscribe; // unsubscribe
return () => {
if (isUnsubscribed) return;
isUnsubscribed = true;
limitRequests.clearQueue();
unsubscribeList.forEach(unsubscribe => unsubscribe());
};
};
return subscribe();
} }
protected connectionCheck() { protected async connectionCheck() {
if (!this.isConnected) {
this.log({ message: "Offline: reconnecting.." });
await this.connect();
}
this.log({ this.log({
message: "connection check", message: `Connection check: ${this.isConnected ? "online" : "offline"}`,
meta: { connected: this.isConnected }, meta: { connected: this.isConnected },
}); });
if (this.isConnected) return;
return this.connect();
} }
protected async connect(apis = this.apis) { protected async connect(apis = this.apis) {

View File

@ -167,7 +167,7 @@ export abstract class ItemStore<T extends ItemObject = ItemObject> {
async removeSelectedItems?(): Promise<any>; async removeSelectedItems?(): Promise<any>;
// eslint-disable-next-line unused-imports/no-unused-vars-ts // eslint-disable-next-line unused-imports/no-unused-vars-ts
async subscribe(...args: any[]): Promise<() => void> { subscribe(...args: any[]) {
return noop; return noop;
} }

View File

@ -211,11 +211,8 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
return [this.api]; return [this.api];
} }
async subscribe(apis = this.getSubscribeApis()): Promise<() => void> { subscribe(apis = this.getSubscribeApis()) {
const cluster = await this.resolveCluster(); return kubeWatchApi.subscribeApi(apis);
const allowedApis = apis.filter(api => cluster.isAllowedResource(api.kind));
return kubeWatchApi.subscribeApi(allowedApis);
} }
@action @action