1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00
lens/src/renderer/api/kube-watch-api.ts
Jari Kolehmainen d7d4be7375
Improve kube watch error handling (#2112)
* improve kube watch error handling

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* cleanup

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* review fixes

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* no more object.assign

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* keep only items from non-loaded namespaces

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* cleanup

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* fix sleep/resume

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* fallback retry only on error

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>
2021-02-11 17:34:14 +02:00

133 lines
4.2 KiB
TypeScript

// Kubernetes watch-api client
// API: https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams
import type { KubeObjectStore } from "../kube-object.store";
import type { ClusterContext } from "../components/context";
import plimit from "p-limit";
import { comparer, IReactionDisposer, observable, reaction, when } from "mobx";
import { autobind, noop } from "../utils";
import { KubeApi } from "./kube-api";
import { KubeJsonApiData } from "./kube-json-api";
import { isDebugging, isProduction } from "../../common/vars";
export interface IKubeWatchEvent<T = KubeJsonApiData> {
type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR";
object?: T;
}
export interface IKubeWatchSubscribeStoreOptions {
namespaces?: string[]; // default: all accessible namespaces
preload?: boolean; // preload store items, default: true
waitUntilLoaded?: boolean; // subscribe only after loading all stores, default: true
loadOnce?: boolean; // check store.isLoaded to skip loading if done already, default: false
}
export interface IKubeWatchLog {
message: string | string[] | Error;
meta?: object;
cssStyle?: string;
}
@autobind()
export class KubeWatchApi {
@observable context: ClusterContext = null;
contextReady = when(() => Boolean(this.context));
isAllowedApi(api: KubeApi): boolean {
return Boolean(this.context?.cluster.isAllowedResource(api.kind));
}
preloadStores(stores: KubeObjectStore[], opts: { namespaces?: string[], loadOnce?: boolean } = {}) {
const limitRequests = plimit(1); // load stores one by one to allow quick skipping when fast clicking btw pages
const preloading: Promise<any>[] = [];
for (const store of stores) {
preloading.push(limitRequests(async () => {
if (store.isLoaded && opts.loadOnce) return; // skip
return store.loadAll({ namespaces: opts.namespaces });
}));
}
return {
loading: Promise.allSettled(preloading),
cancelLoading: () => limitRequests.clearQueue(),
};
}
subscribeStores(stores: KubeObjectStore[], opts: IKubeWatchSubscribeStoreOptions = {}): () => void {
const { preload = true, waitUntilLoaded = true, loadOnce = false, } = opts;
const subscribingNamespaces = opts.namespaces ?? this.context?.allNamespaces ?? [];
const unsubscribeList: Function[] = [];
let isUnsubscribed = false;
const load = (namespaces = subscribingNamespaces) => this.preloadStores(stores, { namespaces, loadOnce });
let preloading = preload && load();
let cancelReloading: IReactionDisposer = noop;
const subscribe = () => {
if (isUnsubscribed) return;
stores.forEach((store) => {
unsubscribeList.push(store.subscribe());
});
};
if (preloading) {
if (waitUntilLoaded) {
preloading.loading.then(subscribe, error => {
this.log({
message: new Error("Loading stores has failed"),
meta: { stores, error, options: opts },
});
});
} else {
subscribe();
}
// reload stores only for context namespaces change
cancelReloading = reaction(() => this.context?.contextNamespaces, namespaces => {
preloading?.cancelLoading();
unsubscribeList.forEach(unsubscribe => unsubscribe());
unsubscribeList.length = 0;
preloading = load(namespaces);
preloading.loading.then(subscribe);
}, {
equals: comparer.shallow,
});
}
// unsubscribe
return () => {
if (isUnsubscribed) return;
isUnsubscribed = true;
cancelReloading();
preloading?.cancelLoading();
unsubscribeList.forEach(unsubscribe => unsubscribe());
unsubscribeList.length = 0;
};
}
protected log({ message, cssStyle = "", meta = {} }: IKubeWatchLog) {
if (isProduction && !isDebugging) {
return;
}
const logInfo = [`%c[KUBE-WATCH-API]:`, `font-weight: bold; ${cssStyle}`, message].flat().map(String);
const logMeta = {
time: new Date().toLocaleString(),
...meta,
};
if (message instanceof Error) {
console.error(...logInfo, logMeta);
} else {
console.info(...logInfo, logMeta);
}
}
}
export const kubeWatchApi = new KubeWatchApi();