1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

Refactor watches to use native k8s api (#2095)

* fix lint

Signed-off-by: Roman <ixrock@gmail.com>

* fixes & refactoring

Signed-off-by: Roman <ixrock@gmail.com>

* fix lint, micro-refactoring

Signed-off-by: Roman <ixrock@gmail.com>

* more refactoring, clean up, responding to comments

Signed-off-by: Roman <ixrock@gmail.com>

* fix: remove extra check for cluster.allowedApi from processing buffered watch-api events

Signed-off-by: Roman <ixrock@gmail.com>

* refactoring, detaching NamespaceStore from KubeObjectStore

Signed-off-by: Roman <ixrock@gmail.com>

* fix: wait for contextReady in NamespaceStore

Signed-off-by: Roman <ixrock@gmail.com>

* refactoring & fixes

Signed-off-by: Roman <ixrock@gmail.com>

* fix lint

Signed-off-by: Roman <ixrock@gmail.com>

* fixes: reloading context stores on NamespaceSelect-change

Signed-off-by: Roman <ixrock@gmail.com>

* optimize loading all resources when "all namespaces" selected -> single request per resource (when have rights)

Signed-off-by: Roman <ixrock@gmail.com>

* use native k8s api watches

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* retry watch when it makes sense

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* workaround for browser connection limits

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* cleanup

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* cleanup

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* use always random subdomain for getResponse

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* resubscribe stores on contextNamespace change

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* fix

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* modify watch event before calling callback

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

Co-authored-by: Roman <ixrock@gmail.com>
This commit is contained in:
Jari Kolehmainen 2021-02-09 15:31:15 +02:00 committed by GitHub
parent 6ded5e73fa
commit 035dd470ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 414 additions and 599 deletions

View File

@ -189,6 +189,7 @@
"@kubernetes/client-node": "^0.12.0",
"array-move": "^3.0.0",
"await-lock": "^2.1.0",
"byline": "^5.0.0",
"chalk": "^4.1.0",
"chokidar": "^3.4.3",
"command-exists": "1.2.9",
@ -221,6 +222,7 @@
"react": "^17.0.1",
"react-dom": "^17.0.1",
"react-router": "^5.2.0",
"readable-web-to-node-stream": "^3.0.1",
"request": "^2.88.2",
"request-promise-native": "^1.0.8",
"semver": "^7.3.2",
@ -242,6 +244,7 @@
"@pmmmwh/react-refresh-webpack-plugin": "^0.4.3",
"@testing-library/jest-dom": "^5.11.5",
"@testing-library/react": "^11.1.0",
"@types/byline": "^4.2.32",
"@types/chart.js": "^2.9.21",
"@types/circular-dependency-plugin": "^5.0.1",
"@types/color": "^3.0.1",

View File

@ -194,7 +194,8 @@ export class LensProxy {
if (proxyTarget) {
// allow to fetch apis in "clusterId.localhost:port" from "localhost:port"
res.setHeader("Access-Control-Allow-Origin", this.origin);
// this should be safe because we have already validated cluster uuid
res.setHeader("Access-Control-Allow-Origin", "*");
return proxy.web(req, res, proxyTarget);
}

View File

@ -5,7 +5,7 @@ import path from "path";
import { readFile } from "fs-extra";
import { Cluster } from "./cluster";
import { apiPrefix, appName, publicPath, isDevelopment, webpackDevServerPort } from "../common/vars";
import { helmRoute, kubeconfigRoute, metricsRoute, portForwardRoute, resourceApplierRoute, watchRoute, versionRoute } from "./routes";
import { helmRoute, kubeconfigRoute, metricsRoute, portForwardRoute, resourceApplierRoute, versionRoute } from "./routes";
import logger from "./logger";
export interface RouterRequestOpts {
@ -146,9 +146,6 @@ export class Router {
this.router.add({ method: "get", path: "/version"}, versionRoute.getVersion.bind(versionRoute));
this.router.add({ method: "get", path: `${apiPrefix}/kubeconfig/service-account/{namespace}/{account}` }, kubeconfigRoute.routeServiceAccountRoute.bind(kubeconfigRoute));
// Watch API
this.router.add({ method: "post", path: `${apiPrefix}/watch` }, watchRoute.routeWatch.bind(watchRoute));
// Metrics API
this.router.add({ method: "post", path: `${apiPrefix}/metrics` }, metricsRoute.routeMetrics.bind(metricsRoute));

View File

@ -1,7 +1,6 @@
export * from "./kubeconfig-route";
export * from "./metrics-route";
export * from "./port-forward-route";
export * from "./watch-route";
export * from "./helm-route";
export * from "./resource-applier-route";
export * from "./version-route";

View File

@ -1,162 +0,0 @@
import type { KubeJsonApiData, KubeJsonApiError } from "../../renderer/api/kube-json-api";
import plimit from "p-limit";
import { delay } from "../../common/utils";
import { LensApiRequest } from "../router";
import { LensApi } from "../lens-api";
import { KubeConfig, Watch } from "@kubernetes/client-node";
import { ServerResponse } from "http";
import { Request } from "request";
import logger from "../logger";
export interface IKubeWatchEvent<T = KubeJsonApiData | KubeJsonApiError> {
type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR" | "STREAM_END";
object?: T;
}
export interface IKubeWatchEventStreamEnd extends IKubeWatchEvent {
type: "STREAM_END";
url: string;
status: number;
}
export interface IWatchRoutePayload {
apis: string[]; // kube-api url list for subscribing to watch events
}
class ApiWatcher {
private apiUrl: string;
private response: ServerResponse;
private watchRequest: Request;
private watch: Watch;
private processor: NodeJS.Timeout;
private eventBuffer: any[] = [];
constructor(apiUrl: string, kubeConfig: KubeConfig, response: ServerResponse) {
this.apiUrl = apiUrl;
this.watch = new Watch(kubeConfig);
this.response = response;
}
public async start() {
if (this.processor) {
clearInterval(this.processor);
}
this.processor = setInterval(() => {
if (this.response.finished) return;
const events = this.eventBuffer.splice(0);
events.map(event => this.sendEvent(event));
this.response.flushHeaders();
}, 50);
this.watchRequest = await this.watch.watch(this.apiUrl, {}, this.watchHandler.bind(this), this.doneHandler.bind(this));
}
public stop() {
if (!this.watchRequest) {
return;
}
if (this.processor) {
clearInterval(this.processor);
}
logger.debug(`Stopping watcher for api: ${this.apiUrl}`);
try {
this.watchRequest.abort();
const event: IKubeWatchEventStreamEnd = {
type: "STREAM_END",
url: this.apiUrl,
status: 410,
};
this.sendEvent(event);
logger.debug("watch aborted");
} catch (error) {
logger.error(`Watch abort errored:${error}`);
}
}
private watchHandler(phase: string, obj: any) {
this.eventBuffer.push({
type: phase,
object: obj
});
}
private doneHandler(error: Error) {
if (error) logger.warn(`watch ended: ${error.toString()}`);
this.watchRequest.abort();
}
private sendEvent(evt: IKubeWatchEvent) {
this.response.write(`${JSON.stringify(evt)}\n`);
}
}
class WatchRoute extends LensApi {
private response: ServerResponse;
private setResponse(response: ServerResponse) {
// clean up previous connection and stop all corresponding watch-api requests
// otherwise it happens only by request timeout or something else..
this.response?.destroy();
this.response = response;
}
public async routeWatch(request: LensApiRequest<IWatchRoutePayload>) {
const { response, cluster, payload: { apis } = {} } = request;
if (!apis?.length) {
this.respondJson(response, {
message: "watch apis list is empty"
}, 400);
return;
}
this.setResponse(response);
response.setHeader("Content-Type", "application/json");
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Connection", "keep-alive");
logger.debug(`watch using kubeconfig:${JSON.stringify(cluster.getProxyKubeconfig(), null, 2)}`);
// limit concurrent k8s requests to avoid possible ECONNRESET-error
const requests = plimit(5);
const watchers = new Map<string, ApiWatcher>();
let isWatchRequestEnded = false;
apis.forEach(apiUrl => {
const watcher = new ApiWatcher(apiUrl, cluster.getProxyKubeconfig(), response);
watchers.set(apiUrl, watcher);
requests(async () => {
if (isWatchRequestEnded) return;
await watcher.start();
await delay(100);
});
});
function onRequestEnd() {
if (isWatchRequestEnded) return;
isWatchRequestEnded = true;
requests.clearQueue();
watchers.forEach(watcher => watcher.stop());
watchers.clear();
}
request.raw.req.on("end", () => {
logger.info("Watch request end");
onRequestEnd();
});
request.raw.req.on("close", () => {
logger.info("Watch request close");
onRequestEnd();
});
}
}
export const watchRoute = new WatchRoute();

View File

@ -3,7 +3,7 @@
import { stringify } from "querystring";
import { EventEmitter } from "../../common/event-emitter";
import { cancelableFetch } from "../utils/cancelableFetch";
import { randomBytes } from "crypto";
export interface JsonApiData {
}
@ -55,6 +55,34 @@ export class JsonApi<D = JsonApiData, P extends JsonApiParams = JsonApiParams> {
return this.request<T>(path, params, { ...reqInit, method: "get" });
}
getResponse(path: string, params?: P, init: RequestInit = {}): Promise<Response> {
const reqPath = `${this.config.apiBase}${path}`;
const subdomain = randomBytes(2).toString("hex");
let reqUrl = `http://${subdomain}.${window.location.host}${reqPath}`; // hack around browser connection limits (chromium allows 6 per domain)
const reqInit: RequestInit = { ...init };
const { query } = params || {} as P;
if (!reqInit.method) {
reqInit.method = "get";
}
if (query) {
const queryString = stringify(query);
reqUrl += (reqUrl.includes("?") ? "&" : "?") + queryString;
}
const infoLog: JsonApiLog = {
method: reqInit.method.toUpperCase(),
reqUrl: reqPath,
reqInit,
};
this.writeLog({ ...infoLog });
return fetch(reqUrl, reqInit);
}
post<T = D>(path: string, params?: P, reqInit: RequestInit = {}) {
return this.request<T>(path, params, { ...reqInit, method: "post" });
}

View File

@ -9,7 +9,9 @@ import { apiKube } from "./index";
import { createKubeApiURL, parseKubeApi } from "./kube-api-parse";
import { KubeJsonApi, KubeJsonApiData, KubeJsonApiDataList } from "./kube-json-api";
import { IKubeObjectConstructor, KubeObject } from "./kube-object";
import { kubeWatchApi } from "./kube-watch-api";
import byline from "byline";
import { ReadableWebToNodeStream } from "readable-web-to-node-stream";
import { IKubeWatchEvent } from "./kube-watch-api";
export interface IKubeApiOptions<T extends KubeObject> {
/**
@ -91,6 +93,12 @@ export function ensureObjectSelfLink(api: KubeApi, object: KubeJsonApiData) {
}
}
type KubeApiWatchOptions = {
namespace: string;
callback?: (data: IKubeWatchEvent) => void;
abortController?: AbortController
};
export class KubeApi<T extends KubeObject = any> {
readonly kind: string;
readonly apiBase: string;
@ -104,6 +112,7 @@ export class KubeApi<T extends KubeObject = any> {
public objectConstructor: IKubeObjectConstructor<T>;
protected request: KubeJsonApi;
protected resourceVersions = new Map<string, string>();
protected watchDisposer: () => void;
constructor(protected options: IKubeApiOptions<T>) {
const {
@ -357,8 +366,88 @@ export class KubeApi<T extends KubeObject = any> {
});
}
watch(): () => void {
return kubeWatchApi.subscribeApi(this);
watch(opts: KubeApiWatchOptions = { namespace: "" }): () => void {
if (!opts.abortController) {
opts.abortController = new AbortController();
}
const { abortController, namespace, callback } = opts;
const watchUrl = this.getWatchUrl(namespace);
const responsePromise = this.request.getResponse(watchUrl, null, {
signal: abortController.signal
});
responsePromise.then((response) => {
if (!response.ok && !abortController.signal.aborted) {
if (response.status === 410) { // resourceVersion has gone
setTimeout(() => {
this.refreshResourceVersion().then(() => {
this.watch({...opts, abortController});
});
}, 1000);
} else if (response.status >= 500) { // k8s is having hard time
setTimeout(() => {
this.watch({...opts, abortController});
}, 5000);
}
return;
}
const nodeStream = new ReadableWebToNodeStream(response.body);
const stream = byline(nodeStream);
stream.on("data", (line) => {
try {
const event: IKubeWatchEvent = JSON.parse(line);
this.modifyWatchEvent(event);
if (callback) {
callback(event);
}
} catch (ignore) {
// ignore parse errors
}
});
stream.on("close", () => {
setTimeout(() => {
if (!abortController.signal.aborted) this.watch({...opts, namespace, callback});
}, 1000);
});
}, (error) => {
if (error instanceof DOMException) return; // AbortController rejects, we can ignore it
console.error("watch rejected", error);
}).catch((error) => {
console.error("watch error", error);
});
const disposer = () => {
abortController.abort();
};
return disposer;
}
protected modifyWatchEvent(event: IKubeWatchEvent) {
switch (event.type) {
case "ADDED":
case "DELETED":
case "MODIFIED": {
ensureObjectSelfLink(this, event.object);
const { namespace, resourceVersion } = event.object.metadata;
this.setResourceVersion(namespace, resourceVersion);
this.setResourceVersion("", resourceVersion);
break;
}
}
}
}

View File

@ -1,143 +1,63 @@
// Kubernetes watch-api client
// API: https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams
import type { Cluster } from "../../main/cluster";
import type { IKubeWatchEvent, IKubeWatchEventStreamEnd, IWatchRoutePayload } from "../../main/routes/watch-route";
import type { KubeObject } from "./kube-object";
import type { KubeObjectStore } from "../kube-object.store";
import type { ClusterContext } from "../components/context";
import plimit from "p-limit";
import debounce from "lodash/debounce";
import { autorun, comparer, computed, IReactionDisposer, observable, reaction } from "mobx";
import { autobind, EventEmitter, noop } from "../utils";
import { ensureObjectSelfLink, KubeApi, parseKubeApi } from "./kube-api";
import { KubeJsonApiData, KubeJsonApiError } from "./kube-json-api";
import { apiPrefix, isDebugging, isProduction } from "../../common/vars";
import { apiManager } from "./api-manager";
import { comparer, IReactionDisposer, observable, reaction, when } from "mobx";
import { autobind, noop } from "../utils";
import { KubeApi } from "./kube-api";
import { KubeJsonApiData } from "./kube-json-api";
import { isDebugging, isProduction } from "../../common/vars";
export { IKubeWatchEvent, IKubeWatchEventStreamEnd };
export interface IKubeWatchMessage<T extends KubeObject = any> {
namespace?: string;
data?: IKubeWatchEvent<KubeJsonApiData>
error?: IKubeWatchEvent<KubeJsonApiError>;
api?: KubeApi<T>;
store?: KubeObjectStore<T>;
export interface IKubeWatchEvent<T = KubeJsonApiData> {
type: "ADDED" | "MODIFIED" | "DELETED";
object?: T;
}
export interface IKubeWatchSubscribeStoreOptions {
namespaces?: string[]; // default: all accessible namespaces
preload?: boolean; // preload store items, default: true
waitUntilLoaded?: boolean; // subscribe only after loading all stores, default: true
loadOnce?: boolean; // check store.isLoaded to skip loading if done already, default: false
}
export interface IKubeWatchReconnectOptions {
reconnectAttempts: number;
timeout: number;
}
export interface IKubeWatchLog {
message: string | Error;
message: string | string[] | Error;
meta?: object;
cssStyle?: string;
}
@autobind()
export class KubeWatchApi {
private requestId = 0;
private reader: ReadableStreamReader<string>;
public onMessage = new EventEmitter<[IKubeWatchMessage]>();
@observable.ref private cluster: Cluster;
@observable.ref private namespaces: string[] = [];
@observable context: ClusterContext = null;
@observable subscribers = observable.map<KubeApi, number>();
@observable isConnected = false;
@computed get isReady(): boolean {
return Boolean(this.cluster && this.namespaces);
contextReady = when(() => Boolean(this.context));
constructor() {
this.init();
}
@computed get isActive(): boolean {
return this.apis.length > 0;
}
@computed get apis(): string[] {
if (!this.isReady) {
return [];
}
return Array.from(this.subscribers.keys()).map(api => {
if (!this.isAllowedApi(api)) {
return [];
}
// TODO: optimize - check when all namespaces are selected and then request all in one
if (api.isNamespaced && !this.cluster.isGlobalWatchEnabled) {
return this.namespaces.map(namespace => api.getWatchUrl(namespace));
}
return api.getWatchUrl();
}).flat();
}
async init({ getCluster, getNamespaces }: {
getCluster: () => Cluster,
getNamespaces: () => string[],
}): Promise<void> {
autorun(() => {
this.cluster = getCluster();
this.namespaces = getNamespaces();
});
this.bindAutoConnect();
}
private bindAutoConnect() {
const connect = debounce(() => this.connect(), 1000);
reaction(() => this.apis, connect, {
fireImmediately: true,
equals: comparer.structural,
});
window.addEventListener("online", () => this.connect());
window.addEventListener("offline", () => this.disconnect());
setInterval(() => this.connectionCheck(), 60000 * 5); // every 5m
}
getSubscribersCount(api: KubeApi) {
return this.subscribers.get(api) || 0;
private async init() {
await this.contextReady;
}
isAllowedApi(api: KubeApi): boolean {
return Boolean(this?.cluster.isAllowedResource(api.kind));
return Boolean(this.context?.cluster.isAllowedResource(api.kind));
}
subscribeApi(api: KubeApi | KubeApi[]): () => void {
const apis: KubeApi[] = [api].flat();
apis.forEach(api => {
if (!this.isAllowedApi(api)) return; // skip
this.subscribers.set(api, this.getSubscribersCount(api) + 1);
});
return () => {
apis.forEach(api => {
const count = this.getSubscribersCount(api) - 1;
if (count <= 0) this.subscribers.delete(api);
else this.subscribers.set(api, count);
});
};
}
preloadStores(stores: KubeObjectStore[], { loadOnce = false } = {}) {
preloadStores(stores: KubeObjectStore[], opts: { namespaces?: string[], loadOnce?: boolean } = {}) {
const limitRequests = plimit(1); // load stores one by one to allow quick skipping when fast clicking btw pages
const preloading: Promise<any>[] = [];
for (const store of stores) {
preloading.push(limitRequests(async () => {
if (store.isLoaded && loadOnce) return; // skip
if (store.isLoaded && opts.loadOnce) return; // skip
return store.loadAll(this.namespaces);
return store.loadAll({ namespaces: opts.namespaces });
}));
}
@ -147,19 +67,22 @@ export class KubeWatchApi {
};
}
subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): () => void {
const { preload = true, waitUntilLoaded = true, loadOnce = false } = options;
const apis = new Set(stores.map(store => store.getSubscribeApis()).flat());
const unsubscribeList: (() => void)[] = [];
subscribeStores(stores: KubeObjectStore[], opts: IKubeWatchSubscribeStoreOptions = {}): () => void {
const { preload = true, waitUntilLoaded = true, loadOnce = false, } = opts;
const subscribingNamespaces = opts.namespaces ?? this.context?.allNamespaces ?? [];
const unsubscribeList: Function[] = [];
let isUnsubscribed = false;
const load = () => this.preloadStores(stores, { loadOnce });
const load = (namespaces = subscribingNamespaces) => this.preloadStores(stores, { namespaces, loadOnce });
let preloading = preload && load();
let cancelReloading: IReactionDisposer = noop;
const subscribe = () => {
if (isUnsubscribed) return;
apis.forEach(api => unsubscribeList.push(this.subscribeApi(api)));
stores.forEach((store) => {
unsubscribeList.push(store.subscribe());
});
};
if (preloading) {
@ -167,17 +90,20 @@ export class KubeWatchApi {
preloading.loading.then(subscribe, error => {
this.log({
message: new Error("Loading stores has failed"),
meta: { stores, error, options },
meta: { stores, error, options: opts },
});
});
} else {
subscribe();
}
// reload when context namespaces changes
cancelReloading = reaction(() => this.namespaces, () => {
// reload stores only for context namespaces change
cancelReloading = reaction(() => this.context?.contextNamespaces, namespaces => {
preloading?.cancelLoading();
preloading = load();
unsubscribeList.forEach(unsubscribe => unsubscribe());
unsubscribeList.length = 0;
preloading = load(namespaces);
preloading.loading.then(subscribe);
}, {
equals: comparer.shallow,
});
@ -190,184 +116,25 @@ export class KubeWatchApi {
cancelReloading();
preloading?.cancelLoading();
unsubscribeList.forEach(unsubscribe => unsubscribe());
unsubscribeList.length = 0;
};
}
protected async connectionCheck() {
if (!this.isConnected) {
this.log({ message: "Offline: reconnecting.." });
await this.connect();
}
this.log({
message: `Connection check: ${this.isConnected ? "online" : "offline"}`,
meta: { connected: this.isConnected },
});
}
protected async connect(apis = this.apis) {
this.disconnect(); // close active connections first
if (!navigator.onLine || !apis.length) {
this.isConnected = false;
return;
}
this.log({
message: "Connecting",
meta: { apis }
});
try {
const requestId = ++this.requestId;
const abortController = new AbortController();
const request = await fetch(`${apiPrefix}/watch`, {
method: "POST",
body: JSON.stringify({ apis } as IWatchRoutePayload),
signal: abortController.signal,
headers: {
"content-type": "application/json"
}
});
// request above is stale since new request-id has been issued
if (this.requestId !== requestId) {
abortController.abort();
return;
}
let jsonBuffer = "";
const stream = request.body.pipeThrough(new TextDecoderStream());
const reader = stream.getReader();
this.isConnected = true;
this.reader = reader;
while (true) {
const { done, value } = await reader.read();
if (done) break; // exit
const events = (jsonBuffer + value).split("\n");
jsonBuffer = this.processBuffer(events);
}
} catch (error) {
this.log({ message: error });
} finally {
this.isConnected = false;
}
}
protected disconnect() {
this.reader?.cancel();
this.reader = null;
this.isConnected = false;
}
// process received stream events, returns unprocessed buffer chunk if any
protected processBuffer(events: string[]): string {
for (const json of events) {
try {
const kubeEvent: IKubeWatchEvent = JSON.parse(json);
const message = this.getMessage(kubeEvent);
if (!this.namespaces.includes(message.namespace)) {
continue; // skip updates from non-watching resources context
}
this.onMessage.emit(message);
} catch (error) {
return json;
}
}
return "";
}
protected getMessage(event: IKubeWatchEvent): IKubeWatchMessage {
const message: IKubeWatchMessage = {};
switch (event.type) {
case "ADDED":
case "DELETED":
case "MODIFIED": {
const data = event as IKubeWatchEvent<KubeJsonApiData>;
const api = apiManager.getApiByKind(data.object.kind, data.object.apiVersion);
message.data = data;
if (api) {
ensureObjectSelfLink(api, data.object);
const { namespace, resourceVersion } = data.object.metadata;
api.setResourceVersion(namespace, resourceVersion);
api.setResourceVersion("", resourceVersion);
message.api = api;
message.store = apiManager.getStore(api);
message.namespace = namespace;
}
break;
}
case "ERROR":
message.error = event as IKubeWatchEvent<KubeJsonApiError>;
break;
case "STREAM_END": {
this.onServerStreamEnd(event as IKubeWatchEventStreamEnd, {
reconnectAttempts: 5,
timeout: 1000,
});
break;
}
}
return message;
}
protected async onServerStreamEnd(event: IKubeWatchEventStreamEnd, opts?: IKubeWatchReconnectOptions) {
const { apiBase, namespace } = parseKubeApi(event.url);
const api = apiManager.getApi(apiBase);
if (!api) return;
try {
await api.refreshResourceVersion({ namespace });
this.connect();
} catch (error) {
this.log({
message: new Error(`Failed to connect on single stream end: ${error}`),
meta: { event, error },
});
if (this.isActive && opts?.reconnectAttempts > 0) {
opts.reconnectAttempts--;
setTimeout(() => this.onServerStreamEnd(event, opts), opts.timeout); // repeat event
}
}
}
protected log({ message, meta = {} }: IKubeWatchLog) {
protected log({ message, cssStyle = "", meta = {} }: IKubeWatchLog) {
if (isProduction && !isDebugging) {
return;
}
const logMessage = `%c[KUBE-WATCH-API]: ${String(message).toUpperCase()}`;
const isError = message instanceof Error;
const textStyle = `font-weight: bold;`;
const time = new Date().toLocaleString();
const logInfo = [`%c[KUBE-WATCH-API]:`, `font-weight: bold; ${cssStyle}`, message].flat().map(String);
const logMeta = {
time: new Date().toLocaleString(),
...meta,
};
if (isError) {
console.error(logMessage, textStyle, { time, ...meta });
if (message instanceof Error) {
console.error(...logInfo, logMeta);
} else {
console.info(logMessage, textStyle, { time, ...meta });
console.info(...logInfo, logMeta);
}
}
}

View File

@ -73,8 +73,8 @@ export class ReleaseStore extends ItemStore<HelmRelease> {
}
}
async loadSelectedNamespaces(): Promise<void> {
return this.loadAll(namespaceStore.getContextNamespaces());
async loadFromContextNamespaces(): Promise<void> {
return this.loadAll(namespaceStore.contextNamespaces);
}
async loadItems(namespaces: string[]) {
@ -86,7 +86,7 @@ export class ReleaseStore extends ItemStore<HelmRelease> {
async create(payload: IReleaseCreatePayload) {
const response = await helmReleasesApi.create(payload);
if (this.isLoaded) this.loadSelectedNamespaces();
if (this.isLoaded) this.loadFromContextNamespaces();
return response;
}
@ -94,7 +94,7 @@ export class ReleaseStore extends ItemStore<HelmRelease> {
async update(name: string, namespace: string, payload: IReleaseUpdatePayload) {
const response = await helmReleasesApi.update(name, namespace, payload);
if (this.isLoaded) this.loadSelectedNamespaces();
if (this.isLoaded) this.loadFromContextNamespaces();
return response;
}
@ -102,7 +102,7 @@ export class ReleaseStore extends ItemStore<HelmRelease> {
async rollback(name: string, namespace: string, revision: number) {
const response = await helmReleasesApi.rollback(name, namespace, revision);
if (this.isLoaded) this.loadSelectedNamespaces();
if (this.isLoaded) this.loadFromContextNamespaces();
return response;
}

View File

@ -30,7 +30,7 @@ export class CrdResources extends React.Component<Props> {
const { store } = this;
if (store && !store.isLoading && !store.isLoaded) {
store.loadSelectedNamespaces();
store.reloadAll();
}
})
]);
@ -97,7 +97,7 @@ export class CrdResources extends React.Component<Props> {
...extraColumns.map((column) => {
let value = jsonPath.value(crdInstance, parseJsonPath(column.jsonPath.slice(1)));
if (Array.isArray(value) || typeof value === "object") {
if (Array.isArray(value) || typeof value === "object") {
value = JSON.stringify(value);
}

View File

@ -14,7 +14,7 @@ export interface KubeEventDetailsProps {
@observer
export class KubeEventDetails extends React.Component<KubeEventDetailsProps> {
async componentDidMount() {
eventStore.loadSelectedNamespaces();
eventStore.reloadAll();
}
render() {

View File

@ -32,8 +32,8 @@ export class NamespaceDetails extends React.Component<Props> {
}
componentDidMount() {
resourceQuotaStore.loadSelectedNamespaces();
limitRangeStore.loadSelectedNamespaces();
resourceQuotaStore.reloadAll();
limitRangeStore.reloadAll();
}
render() {

View File

@ -82,7 +82,7 @@ export class NamespaceSelect extends React.Component<Props> {
@observer
export class NamespaceSelectFilter extends React.Component {
@computed get placeholder(): React.ReactNode {
const namespaces = namespaceStore.getContextNamespaces();
const namespaces = namespaceStore.contextNamespaces;
switch (namespaces.length) {
case 0:

View File

@ -1,10 +1,9 @@
import { action, comparer, computed, IReactionDisposer, IReactionOptions, observable, reaction, toJS, when } from "mobx";
import { action, comparer, computed, IReactionDisposer, IReactionOptions, observable, reaction } from "mobx";
import { autobind, createStorage } from "../../utils";
import { KubeObjectStore, KubeObjectStoreLoadingParams } from "../../kube-object.store";
import { Namespace, namespacesApi } from "../../api/endpoints/namespaces.api";
import { createPageParam } from "../../navigation";
import { apiManager } from "../../api/api-manager";
import { clusterStore, getHostedCluster } from "../../../common/cluster-store";
const storage = createStorage<string[]>("context_namespaces", []);
@ -35,9 +34,6 @@ export class NamespaceStore extends KubeObjectStore<Namespace> {
api = namespacesApi;
@observable private contextNs = observable.set<string>();
@observable isReady = false;
whenReady = when(() => this.isReady);
constructor() {
super();
@ -45,15 +41,11 @@ export class NamespaceStore extends KubeObjectStore<Namespace> {
}
private async init() {
await clusterStore.whenLoaded;
if (!getHostedCluster()) return;
await getHostedCluster().whenReady; // wait for cluster-state from main
await this.contextReady;
this.setContext(this.initialNamespaces);
this.autoLoadAllowedNamespaces();
this.autoUpdateUrlAndLocalStorage();
this.isReady = true;
}
public onContextChange(callback: (contextNamespaces: string[]) => void, opts: IReactionOptions = {}): IReactionDisposer {
@ -73,16 +65,12 @@ export class NamespaceStore extends KubeObjectStore<Namespace> {
}
private autoLoadAllowedNamespaces(): IReactionDisposer {
return reaction(() => this.allowedNamespaces, namespaces => this.loadAll(namespaces), {
return reaction(() => this.allowedNamespaces, namespaces => this.loadAll({ namespaces }), {
fireImmediately: true,
equals: comparer.shallow,
});
}
@computed get allowedNamespaces(): string[] {
return toJS(getHostedCluster().allowedNamespaces);
}
@computed
private get initialNamespaces(): string[] {
const namespaces = new Set(this.allowedNamespaces);
@ -103,27 +91,26 @@ export class NamespaceStore extends KubeObjectStore<Namespace> {
return [];
}
getContextNamespaces(): string[] {
@computed get allowedNamespaces(): string[] {
return Array.from(new Set([
...(this.context?.allNamespaces ?? []), // allowed namespaces from cluster (main), updating every 30s
...this.items.map(item => item.getName()), // loaded namespaces from k8s api
].flat()));
}
@computed get contextNamespaces(): string[] {
const namespaces = Array.from(this.contextNs);
// show all namespaces when nothing selected
if (!namespaces.length) {
// return actual namespaces list since "allowedNamespaces" updating every 30s in cluster and thus might be stale
if (this.isLoaded) {
return this.items.map(namespace => namespace.getName());
}
return this.allowedNamespaces;
return this.allowedNamespaces; // show all namespaces when nothing selected
}
return namespaces;
}
getSubscribeApis() {
const { accessibleNamespaces } = getHostedCluster();
// if user has given static list of namespaces let's not start watches because watch adds stuff that's not wanted
if (accessibleNamespaces.length > 0) {
if (this.context?.cluster.accessibleNamespaces.length > 0) {
return [];
}

View File

@ -29,7 +29,7 @@ export class NodeDetails extends React.Component<Props> {
});
async componentDidMount() {
podsStore.loadSelectedNamespaces();
podsStore.reloadAll();
}
componentWillUnmount() {

View File

@ -7,7 +7,7 @@ import { Dialog, DialogProps } from "../dialog";
import { Wizard, WizardStep } from "../wizard";
import { Select, SelectOption } from "../select";
import { SubTitle } from "../layout/sub-title";
import { IRoleBindingSubject, RoleBinding, ServiceAccount, Role } from "../../api/endpoints";
import { IRoleBindingSubject, Role, RoleBinding, ServiceAccount } from "../../api/endpoints";
import { Icon } from "../icon";
import { Input } from "../input";
import { NamespaceSelect } from "../+namespaces/namespace-select";
@ -19,6 +19,7 @@ import { namespaceStore } from "../+namespaces/namespace.store";
import { serviceAccountsStore } from "../+user-management-service-accounts/service-accounts.store";
import { roleBindingsStore } from "./role-bindings.store";
import { showDetails } from "../kube-object";
import { KubeObjectStore } from "../../kube-object.store";
interface BindingSelectOption extends SelectOption {
value: string; // binding name
@ -73,14 +74,14 @@ export class AddRoleBindingDialog extends React.Component<Props> {
};
async loadData() {
const stores = [
const stores: KubeObjectStore[] = [
namespaceStore,
rolesStore,
serviceAccountsStore,
];
this.isLoading = true;
await Promise.all(stores.map(store => store.loadSelectedNamespaces()));
await Promise.all(stores.map(store => store.reloadAll()));
this.isLoading = false;
}
@ -136,8 +137,7 @@ export class AddRoleBindingDialog extends React.Component<Props> {
roleBinding: this.roleBinding,
addSubjects: subjects,
});
}
else {
} else {
const name = useRoleForBindingName ? selectedRole.getName() : bindingName;
roleBinding = await roleBindingsStore.create({ name, namespace }, {
@ -265,7 +265,7 @@ export class AddRoleBindingDialog extends React.Component<Props> {
</h5>
);
const disableNext = this.isLoading || !selectedRole || !selectedBindings.length;
const nextLabel = isEditing ? "Update" : "Create";
const nextLabel = isEditing ? "Update" : "Create";
return (
<Dialog

View File

@ -20,7 +20,7 @@ interface Props extends KubeObjectDetailsProps<CronJob> {
@observer
export class CronJobDetails extends React.Component<Props> {
async componentDidMount() {
jobStore.loadSelectedNamespaces();
jobStore.reloadAll();
}
render() {

View File

@ -30,7 +30,7 @@ export class DaemonSetDetails extends React.Component<Props> {
});
componentDidMount() {
podsStore.loadSelectedNamespaces();
podsStore.reloadAll();
}
componentWillUnmount() {

View File

@ -31,7 +31,7 @@ export class DeploymentDetails extends React.Component<Props> {
});
componentDidMount() {
podsStore.loadSelectedNamespaces();
podsStore.reloadAll();
}
componentWillUnmount() {

View File

@ -25,7 +25,7 @@ interface Props extends KubeObjectDetailsProps<Job> {
@observer
export class JobDetails extends React.Component<Props> {
async componentDidMount() {
podsStore.loadSelectedNamespaces();
podsStore.reloadAll();
}
render() {

View File

@ -6,7 +6,6 @@ import { OverviewWorkloadStatus } from "./overview-workload-status";
import { Link } from "react-router-dom";
import { workloadURL, workloadStores } from "../+workloads";
import { namespaceStore } from "../+namespaces/namespace.store";
import { PageFiltersList } from "../item-object-list/page-filters-list";
import { NamespaceSelectFilter } from "../+namespaces/namespace-select";
import { isAllowedResource, KubeResource } from "../../../common/rbac";
import { ResourceNames } from "../../../renderer/utils/rbac";
@ -27,7 +26,7 @@ export class OverviewStatuses extends React.Component {
@autobind()
renderWorkload(resource: KubeResource): React.ReactElement {
const store = workloadStores[resource];
const items = store.getAllByNs(namespaceStore.getContextNamespaces());
const items = store.getAllByNs(namespaceStore.contextNamespaces);
return (
<div className="workload" key={resource}>
@ -50,7 +49,6 @@ export class OverviewStatuses extends React.Component {
<h5 className="box grow">Overview</h5>
<NamespaceSelectFilter />
</div>
<PageFiltersList />
<div className="workloads">
{workloads}
</div>

View File

@ -16,6 +16,7 @@ import { cronJobStore } from "../+workloads-cronjobs/cronjob.store";
import { Events } from "../+events";
import { isAllowedResource } from "../../../common/rbac";
import { kubeWatchApi } from "../../api/kube-watch-api";
import { clusterContext } from "../context";
interface Props extends RouteComponentProps<IWorkloadsOverviewRouteParams> {
}
@ -29,6 +30,7 @@ export class WorkloadsOverview extends React.Component<Props> {
jobStore, cronJobStore, eventStore,
], {
preload: true,
namespaces: clusterContext.contextNamespaces,
}),
]);
}

View File

@ -29,7 +29,7 @@ export class ReplicaSetDetails extends React.Component<Props> {
});
async componentDidMount() {
podsStore.loadSelectedNamespaces();
podsStore.reloadAll();
}
componentWillUnmount() {

View File

@ -30,7 +30,7 @@ export class StatefulSetDetails extends React.Component<Props> {
});
componentDidMount() {
podsStore.loadSelectedNamespaces();
podsStore.reloadAll();
}
componentWillUnmount() {

View File

@ -43,12 +43,13 @@ import { ClusterPageMenuRegistration, clusterPageMenuRegistry } from "../../exte
import { TabLayout, TabLayoutRoute } from "./layout/tab-layout";
import { StatefulSetScaleDialog } from "./+workloads-statefulsets/statefulset-scale-dialog";
import { eventStore } from "./+events/event.store";
import { namespaceStore } from "./+namespaces/namespace.store";
import { nodesStore } from "./+nodes/nodes.store";
import { podsStore } from "./+workloads-pods/pods.store";
import { kubeWatchApi } from "../api/kube-watch-api";
import { ReplicaSetScaleDialog } from "./+workloads-replicasets/replicaset-scale-dialog";
import { CommandContainer } from "./command-palette/command-container";
import { KubeObjectStore } from "../kube-object.store";
import { clusterContext } from "./context";
@observer
export class App extends React.Component {
@ -76,11 +77,9 @@ export class App extends React.Component {
});
whatInput.ask(); // Start to monitor user input device
await namespaceStore.whenReady;
await kubeWatchApi.init({
getCluster: getHostedCluster,
getNamespaces: namespaceStore.getContextNamespaces,
});
// Setup hosted cluster context
KubeObjectStore.defaultContext = clusterContext;
kubeWatchApi.context = clusterContext;
}
componentDidMount() {
@ -163,9 +162,9 @@ export class App extends React.Component {
const tabRoutes = this.getTabLayoutRoutes(menu);
if (tabRoutes.length > 0) {
const pageComponent = () => <TabLayout tabs={tabRoutes} />;
const pageComponent = () => <TabLayout tabs={tabRoutes}/>;
route = <Route key={`extension-tab-layout-route-${index}`} component={pageComponent} path={tabRoutes.map((tab) => tab.routePath)} />;
route = <Route key={`extension-tab-layout-route-${index}`} component={pageComponent} path={tabRoutes.map((tab) => tab.routePath)}/>;
this.extensionRoutes.set(menu, route);
} else {
const page = clusterPageRegistry.getByPageTarget(menu.target);
@ -229,7 +228,7 @@ export class App extends React.Component {
<StatefulSetScaleDialog/>
<ReplicaSetScaleDialog/>
<CronJobTriggerDialog/>
<CommandContainer cluster={cluster} />
<CommandContainer cluster={cluster}/>
</ErrorBoundary>
</Router>
);

View File

@ -0,0 +1,23 @@
import type { Cluster } from "../../main/cluster";
import { getHostedCluster } from "../../common/cluster-store";
import { namespaceStore } from "./+namespaces/namespace.store";
export interface ClusterContext {
cluster?: Cluster;
allNamespaces?: string[]; // available / allowed namespaces from cluster.ts
contextNamespaces?: string[]; // selected by user (see: namespace-select.tsx)
}
export const clusterContext: ClusterContext = {
get cluster(): Cluster | null {
return getHostedCluster();
},
get allNamespaces(): string[] {
return this.cluster?.allowedNamespaces ?? [];
},
get contextNamespaces(): string[] {
return namespaceStore.contextNamespaces ?? [];
},
};

View File

@ -80,7 +80,7 @@ export class UpgradeChartStore extends DockTabStore<IChartUpgradeData> {
const values = this.values.getData(tabId);
await Promise.all([
!releaseStore.isLoaded && releaseStore.loadSelectedNamespaces(),
!releaseStore.isLoaded && releaseStore.loadFromContextNamespaces(),
!values && this.loadValues(tabId)
]);
}

View File

@ -38,6 +38,7 @@ interface IHeaderPlaceholders {
export interface ItemListLayoutProps<T extends ItemObject = ItemObject> {
tableId?: string;
className: IClassName;
items?: T[];
store: ItemStore<T>;
dependentStores?: ItemStore[];
preloadStores?: boolean;
@ -138,7 +139,8 @@ export class ItemListLayout extends React.Component<ItemListLayoutProps> {
const { store, dependentStores } = this.props;
const stores = Array.from(new Set([store, ...dependentStores]));
stores.forEach(store => store.loadAll(namespaceStore.getContextNamespaces()));
// load context namespaces by default (see also: `<NamespaceSelectFilter/>`)
stores.forEach(store => store.loadAll(namespaceStore.contextNamespaces));
}
private filterCallbacks: { [type: string]: ItemsFilter } = {
@ -179,11 +181,7 @@ export class ItemListLayout extends React.Component<ItemListLayoutProps> {
@computed get filters() {
let { activeFilters } = pageFilters;
const { isClusterScoped, isSearchable, searchFilters } = this.props;
if (isClusterScoped) {
activeFilters = activeFilters.filter(({ type }) => type !== FilterType.NAMESPACE);
}
const { isSearchable, searchFilters } = this.props;
if (!(isSearchable && searchFilters)) {
activeFilters = activeFilters.filter(({ type }) => type !== FilterType.SEARCH);
@ -217,7 +215,9 @@ export class ItemListLayout extends React.Component<ItemListLayoutProps> {
}
});
return this.applyFilters(filterItems, allItems);
const items = this.props.items ?? allItems;
return this.applyFilters(filterItems, items);
}
@autobind()
@ -337,8 +337,8 @@ export class ItemListLayout extends React.Component<ItemListLayoutProps> {
}
renderInfo() {
const { allItems, items, isReady, userSettings, filters } = this;
const allItemsCount = allItems.length;
const { items, isReady, userSettings, filters } = this;
const allItemsCount = this.props.store.getTotalCount();
const itemsCount = items.length;
const isFiltered = isReady && filters.length > 0;

View File

@ -1,6 +1,5 @@
import { computed, observable, reaction } from "mobx";
import { autobind } from "../../utils";
import { namespaceStore } from "../+namespaces/namespace.store";
import { searchUrlParam } from "../input/search-input-url";
export enum FilterType {
@ -24,32 +23,6 @@ export class PageFiltersStore {
constructor() {
this.syncWithGlobalSearch();
this.syncWithContextNamespace();
}
protected syncWithContextNamespace() {
const disposers = [
reaction(() => this.getValues(FilterType.NAMESPACE), filteredNs => {
if (filteredNs.length !== namespaceStore.getContextNamespaces().length) {
namespaceStore.setContext(filteredNs);
}
}),
namespaceStore.onContextChange(namespaces => {
const filteredNs = this.getValues(FilterType.NAMESPACE);
const isChanged = namespaces.length !== filteredNs.length;
if (isChanged) {
this.filters.replace([
...this.filters.filter(({ type }) => type !== FilterType.NAMESPACE),
...namespaces.map(ns => ({ type: FilterType.NAMESPACE, value: ns })),
]);
}
}, {
fireImmediately: true
})
];
return () => disposers.forEach(dispose => dispose());
}
protected syncWithGlobalSearch() {

View File

@ -8,6 +8,7 @@ import { KubeObjectStore } from "../../kube-object.store";
import { KubeObjectMenu } from "./kube-object-menu";
import { kubeSelectedUrlParam, showDetails } from "./kube-object-details";
import { kubeWatchApi } from "../../api/kube-watch-api";
import { clusterContext } from "../context";
export interface KubeObjectListLayoutProps extends ItemListLayoutProps {
store: KubeObjectStore;
@ -26,7 +27,8 @@ export class KubeObjectListLayout extends React.Component<KubeObjectListLayoutPr
disposeOnUnmount(this, [
kubeWatchApi.subscribeStores(stores, {
preload: true
preload: true,
namespaces: clusterContext.contextNamespaces,
})
]);
}
@ -40,12 +42,14 @@ export class KubeObjectListLayout extends React.Component<KubeObjectListLayoutPr
};
render() {
const items = this.props.store.contextItems;
const { className, ...layoutProps } = this.props;
return (
<ItemListLayout
{...layoutProps}
className={cssNames("KubeObjectListLayout", className)}
items={items}
preloadStores={false} // loading handled in kubeWatchApi.subscribeStores()
detailsItem={this.selectedItem}
onDetails={this.onDetails}

View File

@ -40,7 +40,7 @@ interface Props {
@observer
export class Sidebar extends React.Component<Props> {
async componentDidMount() {
crdStore.loadSelectedNamespaces();
crdStore.reloadAll();
}
renderCustomResources() {

View File

@ -9,7 +9,7 @@ export interface ItemObject {
@autobind()
export abstract class ItemStore<T extends ItemObject = ItemObject> {
abstract loadAll(...args: any[]): Promise<void>;
abstract loadAll(...args: any[]): Promise<void | T[]>;
protected defaultSorting = (item: T) => item.getName();
@ -22,11 +22,23 @@ export abstract class ItemStore<T extends ItemObject = ItemObject> {
return this.items.filter(item => this.selectedItemsIds.get(item.getId()));
}
public getItems(): T[] {
return this.items.toJS();
}
public getTotalCount(): number {
return this.items.length;
}
getByName(name: string, ...args: any[]): T;
getByName(name: string): T {
return this.items.find(item => item.getName() === name);
}
getIndexById(id: string): number {
return this.items.findIndex(item => item.getId() === id);
}
@action
protected sortItems(items: T[] = this.items, sorting?: ((item: T) => any)[], order?: "asc" | "desc"): T[] {
return orderBy(items, sorting || this.defaultSorting, order);

View File

@ -1,8 +1,9 @@
import type { Cluster } from "../main/cluster";
import { action, observable, reaction } from "mobx";
import type { ClusterContext } from "./components/context";
import { action, computed, observable, reaction, when } from "mobx";
import { autobind } from "./utils";
import { KubeObject } from "./api/kube-object";
import { IKubeWatchEvent, IKubeWatchMessage, kubeWatchApi } from "./api/kube-watch-api";
import { IKubeWatchEvent } from "./api/kube-watch-api";
import { ItemStore } from "./item.store";
import { apiManager } from "./api/api-manager";
import { IKubeApiQueryParams, KubeApi, parseKubeApi } from "./api/kube-api";
@ -15,15 +16,38 @@ export interface KubeObjectStoreLoadingParams {
@autobind()
export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemStore<T> {
@observable static defaultContext: ClusterContext; // TODO: support multiple cluster contexts
abstract api: KubeApi<T>;
public readonly limit?: number;
public readonly bufferSize: number = 50000;
private loadedNamespaces: string[] = [];
contextReady = when(() => Boolean(this.context));
constructor() {
super();
this.bindWatchEventsUpdater();
}
get context(): ClusterContext {
return KubeObjectStore.defaultContext;
}
@computed get contextItems(): T[] {
const namespaces = this.context?.contextNamespaces ?? [];
return this.items.filter(item => {
const itemNamespace = item.getNs();
return !itemNamespace /* cluster-wide */ || namespaces.includes(itemNamespace);
});
}
getTotalCount(): number {
return this.contextItems.length;
}
get query(): IKubeApiQueryParams {
const { limit } = this;
@ -79,23 +103,25 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
}
}
protected async resolveCluster(): Promise<Cluster> {
const { getHostedCluster } = await import("../common/cluster-store");
return getHostedCluster();
}
protected async loadItems({ namespaces, api }: KubeObjectStoreLoadingParams): Promise<T[]> {
const cluster = await this.resolveCluster();
if (this.context?.cluster.isAllowedResource(api.kind)) {
if (!api.isNamespaced) {
return api.list({}, this.query);
}
if (cluster.isAllowedResource(api.kind)) {
if (api.isNamespaced) {
return Promise
const isLoadingAll = this.context.allNamespaces.every(ns => namespaces.includes(ns));
if (isLoadingAll) {
this.loadedNamespaces = [];
return api.list({}, this.query);
} else {
this.loadedNamespaces = namespaces;
return Promise // load resources per namespace
.all(namespaces.map(namespace => api.list({ namespace })))
.then(items => items.flat());
}
return api.list({}, this.query);
}
return [];
@ -106,24 +132,25 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
}
@action
async loadAll(namespaces: string[] = []): Promise<void> {
async loadAll(options: { namespaces?: string[], merge?: boolean } = {}): Promise<void | T[]> {
await this.contextReady;
this.isLoading = true;
try {
if (!namespaces.length) {
const { namespaceStore } = await import("./components/+namespaces/namespace.store");
const {
namespaces = this.context.allNamespaces, // load all namespaces by default
merge = true, // merge loaded items or return as result
} = options;
// load all available namespaces by default
namespaces.push(...namespaceStore.allowedNamespaces);
}
const items = await this.loadItems({ namespaces, api: this.api });
let items = await this.loadItems({ namespaces, api: this.api });
items = this.filterItemsOnLoad(items);
items = this.sortItems(items);
this.items.replace(items);
this.isLoaded = true;
if (merge) {
this.mergeItems(items, { replace: false });
} else {
return items;
}
} catch (error) {
console.error("Loading store items failed", { error, store: this });
this.resetOnError(error);
@ -132,10 +159,36 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
}
}
async loadSelectedNamespaces(): Promise<void> {
const { namespaceStore } = await import("./components/+namespaces/namespace.store");
@action
reloadAll(opts: { force?: boolean, namespaces?: string[], merge?: boolean } = {}) {
const { force = false, ...loadingOptions } = opts;
return this.loadAll(namespaceStore.getContextNamespaces());
if (this.isLoading || (this.isLoaded && !force)) {
return;
}
return this.loadAll(loadingOptions);
}
@action
protected mergeItems(partialItems: T[], { replace = false, updateStore = true, sort = true, filter = true } = {}): T[] {
let items = partialItems;
// update existing items
if (!replace) {
const partialIds = partialItems.map(item => item.getId());
items = [
...this.items.filter(existingItem => !partialIds.includes(existingItem.getId())),
...partialItems,
];
}
if (filter) items = this.filterItemsOnLoad(items);
if (sort) items = this.sortItems(items);
if (updateStore) this.items.replace(items);
return items;
}
protected resetOnError(error: any) {
@ -204,12 +257,7 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
protected eventsBuffer = observable.array<IKubeWatchEvent<KubeJsonApiData>>([], { deep: false });
protected bindWatchEventsUpdater(delay = 1000) {
kubeWatchApi.onMessage.addListener(({ store, data }: IKubeWatchMessage<T>) => {
if (!this.isLoaded || store !== this) return;
this.eventsBuffer.push(data);
});
reaction(() => this.eventsBuffer.length > 0, this.updateFromEventsBuffer, {
reaction(() => this.eventsBuffer.length, this.updateFromEventsBuffer, {
delay
});
}
@ -219,7 +267,31 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
}
subscribe(apis = this.getSubscribeApis()) {
return kubeWatchApi.subscribeApi(apis);
let disposers: {(): void}[] = [];
const callback = (data: IKubeWatchEvent) => {
if (!this.isLoaded) return;
this.eventsBuffer.push(data);
};
if (this.context.cluster?.isGlobalWatchEnabled && this.loadedNamespaces.length === 0) {
disposers = apis.map(api => api.watch({
namespace: "",
callback: (data) => callback(data),
}));
} else {
apis.map(api => {
this.loadedNamespaces.forEach((namespace) => {
disposers.push(api.watch({
namespace,
callback: (data) => callback(data)
}));
});
});
}
return () => disposers.forEach(dispose => dispose());
}
@action
@ -239,7 +311,7 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
if (!item) {
items.push(newItem);
} else {
items.splice(index, 1, newItem);
items[index] = newItem;
}
break;
case "DELETED":

View File

@ -1079,6 +1079,13 @@
resolved "https://registry.yarnpkg.com/@types/boom/-/boom-7.3.0.tgz#33280c5552d4cfabc21b8b7e0f6d29292decd985"
integrity sha512-PH7bfkt1nu4pnlxz+Ws+wwJJF1HE12W3ia+Iace2JT7q56DLH3hbyjOJyNHJYRxk3PkKaC36fHfHKyeG1rMgCA==
"@types/byline@^4.2.32":
version "4.2.32"
resolved "https://registry.yarnpkg.com/@types/byline/-/byline-4.2.32.tgz#9d35ec15968056118548412ee24c2c3026c997dc"
integrity sha512-qtlm/J6XOO9p+Ep/ZB5+mCFEDhzWDDHWU4a1eReN7lkPZXW9rkloq2jcAhvKKmlO5tL2GSvKROb+PTsNVhBiyQ==
dependencies:
"@types/node" "*"
"@types/caseless@*":
version "0.12.2"
resolved "https://registry.yarnpkg.com/@types/caseless/-/caseless-0.12.2.tgz#f65d3d6389e01eeb458bd54dc8f52b95a9463bc8"
@ -1608,6 +1615,14 @@
"@types/prop-types" "*"
csstype "^3.0.2"
"@types/readable-stream@^2.3.9":
version "2.3.9"
resolved "https://registry.yarnpkg.com/@types/readable-stream/-/readable-stream-2.3.9.tgz#40a8349e6ace3afd2dd1b6d8e9b02945de4566a9"
integrity sha512-sqsgQqFT7HmQz/V5jH1O0fvQQnXAJO46Gg9LRO/JPfjmVmGUlcx831TZZO3Y3HtWhIkzf3kTsNT0Z0kzIhIvZw==
dependencies:
"@types/node" "*"
safe-buffer "*"
"@types/relateurl@*":
version "0.2.28"
resolved "https://registry.yarnpkg.com/@types/relateurl/-/relateurl-0.2.28.tgz#6bda7db8653fa62643f5ee69e9f69c11a392e3a6"
@ -11449,6 +11464,14 @@ readable-stream@~1.1.10:
isarray "0.0.1"
string_decoder "~0.10.x"
readable-web-to-node-stream@^3.0.1:
version "3.0.1"
resolved "https://registry.yarnpkg.com/readable-web-to-node-stream/-/readable-web-to-node-stream-3.0.1.tgz#3f619b1bc5dd73a4cfe5c5f9b4f6faba55dff845"
integrity sha512-4zDC6CvjUyusN7V0QLsXVB7pJCD9+vtrM9bYDRv6uBQ+SKfx36rp5AFNPRgh9auKRul/a1iFZJYXcCbwRL+SaA==
dependencies:
"@types/readable-stream" "^2.3.9"
readable-stream "^3.6.0"
readdir-scoped-modules@^1.0.0, readdir-scoped-modules@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/readdir-scoped-modules/-/readdir-scoped-modules-1.1.0.tgz#8d45407b4f870a0dcaebc0e28670d18e74514309"
@ -11867,16 +11890,16 @@ rxjs@^6.5.2:
dependencies:
tslib "^1.9.0"
safe-buffer@*, safe-buffer@>=5.1.0, safe-buffer@^5.0.1, safe-buffer@^5.1.0, safe-buffer@^5.1.1, safe-buffer@^5.1.2, safe-buffer@^5.2.0, safe-buffer@~5.2.0:
version "5.2.1"
resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.2.1.tgz#1eaf9fa9bdb1fdd4ec75f58f9cdb4e6b7827eec6"
integrity sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==
safe-buffer@5.1.2, safe-buffer@~5.1.0, safe-buffer@~5.1.1:
version "5.1.2"
resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.2.tgz#991ec69d296e0313747d59bdfd2b745c35f8828d"
integrity sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==
safe-buffer@>=5.1.0, safe-buffer@^5.0.1, safe-buffer@^5.1.0, safe-buffer@^5.1.1, safe-buffer@^5.1.2, safe-buffer@^5.2.0, safe-buffer@~5.2.0:
version "5.2.1"
resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.2.1.tgz#1eaf9fa9bdb1fdd4ec75f58f9cdb4e6b7827eec6"
integrity sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==
safe-regex@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/safe-regex/-/safe-regex-1.1.0.tgz#40a3669f3b077d1e943d44629e157dd48023bf2e"