mirror of
https://github.com/lensapp/lens.git
synced 2025-05-20 05:10:56 +00:00
Merge branch 'fix-1898/watch-api-streaming' into fix/store.loadAll
# Conflicts: # src/renderer/api/kube-watch-api.ts
This commit is contained in:
commit
4de50d6083
@ -6,6 +6,7 @@ import type { IKubeWatchEvent, IKubeWatchEventStreamEnd, IWatchRoutePayload } fr
|
||||
import type { KubeObject } from "./kube-object";
|
||||
import type { KubeObjectStore } from "../kube-object.store";
|
||||
|
||||
import plimit from "p-limit";
|
||||
import debounce from "lodash/debounce";
|
||||
import { comparer, computed, observable, reaction, when } from "mobx";
|
||||
import { autobind, EventEmitter } from "../utils";
|
||||
@ -26,6 +27,7 @@ export interface IKubeWatchMessage<T extends KubeObject = any> {
|
||||
export interface IKubeWatchSubscribeStoreOptions {
|
||||
preload?: boolean; // preload store items, default: true
|
||||
waitUntilLoaded?: boolean; // subscribe only after loading all stores, default: true
|
||||
cacheLoading?: boolean; // when enabled loading store will be skipped, default: false
|
||||
}
|
||||
|
||||
export interface IKubeWatchReconnectOptions {
|
||||
@ -103,10 +105,15 @@ export class KubeWatchApi {
|
||||
return this.subscribers.get(api) || 0;
|
||||
}
|
||||
|
||||
subscribeApi(api: KubeApi | KubeApi[]) {
|
||||
isAllowedApi(api: KubeApi): boolean {
|
||||
return this.cluster.isAllowedResource(api.kind);
|
||||
}
|
||||
|
||||
subscribeApi(api: KubeApi | KubeApi[]): () => void {
|
||||
const apis: KubeApi[] = [api].flat();
|
||||
|
||||
apis.forEach(api => {
|
||||
if (!this.isAllowedApi(api)) return; // skip
|
||||
this.subscribers.set(api, this.getSubscribersCount(api) + 1);
|
||||
});
|
||||
|
||||
@ -121,52 +128,63 @@ export class KubeWatchApi {
|
||||
}
|
||||
|
||||
subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): () => void {
|
||||
const { preload = true, waitUntilLoaded = true } = options;
|
||||
const loading: Promise<any>[] = [];
|
||||
const disposers: Function[] = [];
|
||||
let isDisposed = false;
|
||||
|
||||
async function subscribe() {
|
||||
if (isDisposed) return;
|
||||
const unsubscribeList = await Promise.all(stores.map(store => store.subscribe()));
|
||||
|
||||
disposers.push(...unsubscribeList);
|
||||
if (isDisposed) unsubscribe();
|
||||
}
|
||||
|
||||
function unsubscribe() {
|
||||
isDisposed = true;
|
||||
disposers.forEach(dispose => dispose());
|
||||
disposers.length = 0;
|
||||
}
|
||||
const { preload = true, waitUntilLoaded = true, cacheLoading = false } = options;
|
||||
const limitRequests = plimit(1); // load stores one by one to allow quick skipping when fast clicking btw pages
|
||||
const preloading: Promise<any>[] = [];
|
||||
const apis = new Set(stores.map(store => store.getSubscribeApis()).flat());
|
||||
let isUnsubscribed = false;
|
||||
|
||||
if (preload) {
|
||||
loading.push(...stores.map(store => store.loadAll(this.getNamespaces())));
|
||||
for (const store of stores) {
|
||||
preloading.push(limitRequests(async () => {
|
||||
if (cacheLoading && store.isLoaded) return; // skip
|
||||
|
||||
return store.loadAll(this.getNamespaces());
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
if (waitUntilLoaded) {
|
||||
Promise.all(loading).then(subscribe, error => {
|
||||
this.log({
|
||||
message: new Error("Loading stores has failed"),
|
||||
meta: { stores, error, options },
|
||||
const subscribe = () => {
|
||||
const unsubscribeList: (() => void)[] = [];
|
||||
|
||||
const subscribeApis = () => {
|
||||
if (isUnsubscribed) return;
|
||||
apis.forEach(api => unsubscribeList.push(this.subscribeApi(api)));
|
||||
};
|
||||
|
||||
if (waitUntilLoaded) {
|
||||
Promise.all(preloading).then(subscribeApis, error => {
|
||||
this.log({
|
||||
message: new Error("Loading stores has failed"),
|
||||
meta: { stores, error, options },
|
||||
});
|
||||
});
|
||||
});
|
||||
} else {
|
||||
subscribe();
|
||||
}
|
||||
} else {
|
||||
subscribeApis();
|
||||
}
|
||||
|
||||
return unsubscribe;
|
||||
// unsubscribe
|
||||
return () => {
|
||||
if (isUnsubscribed) return;
|
||||
isUnsubscribed = true;
|
||||
limitRequests.clearQueue();
|
||||
unsubscribeList.forEach(unsubscribe => unsubscribe());
|
||||
};
|
||||
};
|
||||
|
||||
return subscribe();
|
||||
}
|
||||
|
||||
protected connectionCheck() {
|
||||
protected async connectionCheck() {
|
||||
if (!this.isConnected) {
|
||||
this.log({ message: "Offline: reconnecting.." });
|
||||
await this.connect();
|
||||
}
|
||||
|
||||
this.log({
|
||||
message: "connection check",
|
||||
message: `Connection check: ${this.isConnected ? "online" : "offline"}`,
|
||||
meta: { connected: this.isConnected },
|
||||
});
|
||||
|
||||
if (this.isConnected) return;
|
||||
|
||||
return this.connect();
|
||||
}
|
||||
|
||||
protected async connect(apis = this.apis) {
|
||||
|
||||
@ -167,7 +167,7 @@ export abstract class ItemStore<T extends ItemObject = ItemObject> {
|
||||
async removeSelectedItems?(): Promise<any>;
|
||||
|
||||
// eslint-disable-next-line unused-imports/no-unused-vars-ts
|
||||
async subscribe(...args: any[]): Promise<() => void> {
|
||||
subscribe(...args: any[]) {
|
||||
return noop;
|
||||
}
|
||||
|
||||
|
||||
@ -218,11 +218,8 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
|
||||
return [this.api];
|
||||
}
|
||||
|
||||
async subscribe(apis = this.getSubscribeApis()): Promise<() => void> {
|
||||
const cluster = await this.resolveCluster();
|
||||
const allowedApis = apis.filter(api => cluster.isAllowedResource(api.kind));
|
||||
|
||||
return kubeWatchApi.subscribeApi(allowedApis);
|
||||
subscribe(apis = this.getSubscribeApis()) {
|
||||
return kubeWatchApi.subscribeApi(apis);
|
||||
}
|
||||
|
||||
@action
|
||||
|
||||
Loading…
Reference in New Issue
Block a user