diff --git a/src/common/utils/delay.ts b/src/common/utils/delay.ts new file mode 100644 index 0000000000..208e042759 --- /dev/null +++ b/src/common/utils/delay.ts @@ -0,0 +1,6 @@ +// Create async delay for provided timeout in milliseconds + +export async function delay(timeoutMs = 1000) { + if (!timeoutMs) return; + await new Promise(resolve => setTimeout(resolve, timeoutMs)); +} diff --git a/src/common/utils/index.ts b/src/common/utils/index.ts index 582135d7f0..942c675f0a 100644 --- a/src/common/utils/index.ts +++ b/src/common/utils/index.ts @@ -7,6 +7,7 @@ export * from "./autobind"; export * from "./base64"; export * from "./camelCase"; export * from "./cloneJson"; +export * from "./delay"; export * from "./debouncePromise"; export * from "./defineGlobal"; export * from "./getRandId"; diff --git a/src/main/router.ts b/src/main/router.ts index 896893a592..6e98d0ce0c 100644 --- a/src/main/router.ts +++ b/src/main/router.ts @@ -146,7 +146,7 @@ export class Router { this.router.add({ method: "get", path: `${apiPrefix}/kubeconfig/service-account/{namespace}/{account}` }, kubeconfigRoute.routeServiceAccountRoute.bind(kubeconfigRoute)); // Watch API - this.router.add({ method: "get", path: `${apiPrefix}/watch` }, watchRoute.routeWatch.bind(watchRoute)); + this.router.add({ method: "post", path: `${apiPrefix}/watch` }, watchRoute.routeWatch.bind(watchRoute)); // Metrics API this.router.add({ method: "post", path: `${apiPrefix}/metrics` }, metricsRoute.routeMetrics.bind(metricsRoute)); diff --git a/src/main/routes/watch-route.ts b/src/main/routes/watch-route.ts index eb9f007eae..2c86314908 100644 --- a/src/main/routes/watch-route.ts +++ b/src/main/routes/watch-route.ts @@ -1,10 +1,29 @@ +import type { KubeJsonApiData, KubeJsonApiError } from "../../renderer/api/kube-json-api"; + +import plimit from "p-limit"; +import { delay } from "../../common/utils"; import { LensApiRequest } from "../router"; import { LensApi } from "../lens-api"; -import { Watch, KubeConfig } from "@kubernetes/client-node"; +import { KubeConfig, Watch } from "@kubernetes/client-node"; import { ServerResponse } from "http"; import { Request } from "request"; import logger from "../logger"; +export interface IKubeWatchEvent { + type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR" | "STREAM_END"; + object?: T; +} + +export interface IKubeWatchEventStreamEnd extends IKubeWatchEvent { + type: "STREAM_END"; + url: string; + status: number; +} + +export interface IWatchRoutePayload { + apis: string[]; // kube-api url list for subscribing to watch events +} + class ApiWatcher { private apiUrl: string; private response: ServerResponse; @@ -24,6 +43,7 @@ class ApiWatcher { clearInterval(this.processor); } this.processor = setInterval(() => { + if (this.response.finished) return; const events = this.eventBuffer.splice(0); events.map(event => this.sendEvent(event)); @@ -33,7 +53,9 @@ class ApiWatcher { } public stop() { - if (!this.watchRequest) { return; } + if (!this.watchRequest) { + return; + } if (this.processor) { clearInterval(this.processor); @@ -42,11 +64,14 @@ class ApiWatcher { try { this.watchRequest.abort(); - this.sendEvent({ + + const event: IKubeWatchEventStreamEnd = { type: "STREAM_END", url: this.apiUrl, status: 410, - }); + }; + + this.sendEvent(event); logger.debug("watch aborted"); } catch (error) { logger.error(`Watch abort errored:${error}`); @@ -65,50 +90,72 @@ class ApiWatcher { this.watchRequest.abort(); } - private sendEvent(evt: any) { - // convert to "text/event-stream" format - this.response.write(`data: ${JSON.stringify(evt)}\n\n`); + private sendEvent(evt: IKubeWatchEvent) { + this.response.write(`${JSON.stringify(evt)}\n`); } } class WatchRoute extends LensApi { + private response: ServerResponse; - public async routeWatch(request: LensApiRequest) { - const { response, cluster} = request; - const apis: string[] = request.query.getAll("api"); - const watchers: ApiWatcher[] = []; + private setResponse(response: ServerResponse) { + // clean up previous connection and stop all corresponding watch-api requests + // otherwise it happens only by request timeout or something else.. + this.response?.destroy(); + this.response = response; + } - if (!apis.length) { + public async routeWatch(request: LensApiRequest) { + const { response, cluster, payload: { apis } = {} } = request; + + if (!apis?.length) { this.respondJson(response, { - message: "Empty request. Query params 'api' are not provided.", - example: "?api=/api/v1/pods&api=/api/v1/nodes", + message: "watch apis list is empty" }, 400); return; } - response.setHeader("Content-Type", "text/event-stream"); + this.setResponse(response); + response.setHeader("Content-Type", "application/json"); response.setHeader("Cache-Control", "no-cache"); response.setHeader("Connection", "keep-alive"); logger.debug(`watch using kubeconfig:${JSON.stringify(cluster.getProxyKubeconfig(), null, 2)}`); + // limit concurrent k8s requests to avoid possible ECONNRESET-error + const requests = plimit(5); + const watchers = new Map(); + let isWatchRequestEnded = false; + apis.forEach(apiUrl => { const watcher = new ApiWatcher(apiUrl, cluster.getProxyKubeconfig(), response); - watcher.start(); - watchers.push(watcher); + watchers.set(apiUrl, watcher); + + requests(async () => { + if (isWatchRequestEnded) return; + await watcher.start(); + await delay(100); + }); + }); + + function onRequestEnd() { + if (isWatchRequestEnded) return; + isWatchRequestEnded = true; + requests.clearQueue(); + watchers.forEach(watcher => watcher.stop()); + watchers.clear(); + } + + request.raw.req.on("end", () => { + logger.info("Watch request end"); + onRequestEnd(); }); request.raw.req.on("close", () => { - logger.debug("Watch request closed"); - watchers.map(watcher => watcher.stop()); + logger.info("Watch request close"); + onRequestEnd(); }); - - request.raw.req.on("end", () => { - logger.debug("Watch request ended"); - watchers.map(watcher => watcher.stop()); - }); - } } diff --git a/src/renderer/api/api-manager.ts b/src/renderer/api/api-manager.ts index 629a0f29c2..47500adf79 100644 --- a/src/renderer/api/api-manager.ts +++ b/src/renderer/api/api-manager.ts @@ -2,7 +2,7 @@ import type { KubeObjectStore } from "../kube-object.store"; import { action, observable } from "mobx"; import { autobind } from "../utils"; -import { KubeApi } from "./kube-api"; +import { KubeApi, parseKubeApi } from "./kube-api"; @autobind() export class ApiManager { @@ -11,7 +11,7 @@ export class ApiManager { getApi(pathOrCallback: string | ((api: KubeApi) => boolean)) { if (typeof pathOrCallback === "string") { - return this.apis.get(pathOrCallback) || this.apis.get(KubeApi.parseApi(pathOrCallback).apiBase); + return this.apis.get(pathOrCallback) || this.apis.get(parseKubeApi(pathOrCallback).apiBase); } return Array.from(this.apis.values()).find(pathOrCallback ?? (() => true)); diff --git a/src/renderer/api/kube-api.ts b/src/renderer/api/kube-api.ts index 8a3a2517c2..e62603b14f 100644 --- a/src/renderer/api/kube-api.ts +++ b/src/renderer/api/kube-api.ts @@ -92,14 +92,6 @@ export function ensureObjectSelfLink(api: KubeApi, object: KubeJsonApiData) { } export class KubeApi { - static parseApi = parseKubeApi; - - static watchAll(...apis: KubeApi[]) { - const disposers = apis.map(api => api.watch()); - - return () => disposers.forEach(unwatch => unwatch()); - } - readonly kind: string; readonly apiBase: string; readonly apiPrefix: string; @@ -124,7 +116,7 @@ export class KubeApi { if (!options.apiBase) { options.apiBase = objectConstructor.apiBase; } - const { apiBase, apiPrefix, apiGroup, apiVersion, resource } = KubeApi.parseApi(options.apiBase); + const { apiBase, apiPrefix, apiGroup, apiVersion, resource } = parseKubeApi(options.apiBase); this.kind = kind; this.isNamespaced = isNamespaced; @@ -157,7 +149,7 @@ export class KubeApi { for (const apiUrl of apiBases) { // Split e.g. "/apis/extensions/v1beta1/ingresses" to parts - const { apiPrefix, apiGroup, apiVersionWithGroup, resource } = KubeApi.parseApi(apiUrl); + const { apiPrefix, apiGroup, apiVersionWithGroup, resource } = parseKubeApi(apiUrl); // Request available resources try { @@ -366,7 +358,7 @@ export class KubeApi { } watch(): () => void { - return kubeWatchApi.subscribe(this); + return kubeWatchApi.subscribeApi(this); } } diff --git a/src/renderer/api/kube-watch-api.ts b/src/renderer/api/kube-watch-api.ts index fe35a04baa..8adf58676f 100644 --- a/src/renderer/api/kube-watch-api.ts +++ b/src/renderer/api/kube-watch-api.ts @@ -1,202 +1,349 @@ -// Kubernetes watch-api consumer +// Kubernetes watch-api client +// API: https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams -import { computed, observable, reaction } from "mobx"; -import { stringify } from "querystring"; -import { autobind, EventEmitter } from "../utils"; -import { KubeJsonApiData } from "./kube-json-api"; +import type { Cluster } from "../../main/cluster"; +import type { IKubeWatchEvent, IKubeWatchEventStreamEnd, IWatchRoutePayload } from "../../main/routes/watch-route"; +import type { KubeObject } from "./kube-object"; import type { KubeObjectStore } from "../kube-object.store"; -import { ensureObjectSelfLink, KubeApi } from "./kube-api"; +import type { NamespaceStore } from "../components/+namespaces/namespace.store"; + +import plimit from "p-limit"; +import debounce from "lodash/debounce"; +import { comparer, computed, observable, reaction } from "mobx"; +import { autobind, EventEmitter } from "../utils"; +import { ensureObjectSelfLink, KubeApi, parseKubeApi } from "./kube-api"; +import { KubeJsonApiData, KubeJsonApiError } from "./kube-json-api"; +import { apiPrefix, isDebugging, isProduction } from "../../common/vars"; import { apiManager } from "./api-manager"; -import { apiPrefix, isDevelopment } from "../../common/vars"; -import { getHostedCluster } from "../../common/cluster-store"; -export interface IKubeWatchEvent { - type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR"; - object?: T; +export { IKubeWatchEvent, IKubeWatchEventStreamEnd }; + +export interface IKubeWatchMessage { + data?: IKubeWatchEvent + error?: IKubeWatchEvent; + api?: KubeApi; + store?: KubeObjectStore; } -export interface IKubeWatchRouteEvent { - type: "STREAM_END"; - url: string; - status: number; +export interface IKubeWatchSubscribeStoreOptions { + preload?: boolean; // preload store items, default: true + waitUntilLoaded?: boolean; // subscribe only after loading all stores, default: true + cacheLoading?: boolean; // when enabled loading store will be skipped, default: false } -export interface IKubeWatchRouteQuery { - api: string | string[]; +export interface IKubeWatchReconnectOptions { + reconnectAttempts: number; + timeout: number; +} + +export interface IKubeWatchLog { + message: string | Error; + meta?: object; } @autobind() export class KubeWatchApi { - protected evtSource: EventSource; - protected onData = new EventEmitter<[IKubeWatchEvent]>(); - protected subscribers = observable.map(); - protected reconnectTimeoutMs = 5000; - protected maxReconnectsOnError = 10; - protected reconnectAttempts = this.maxReconnectsOnError; + private cluster: Cluster; + private namespaceStore: NamespaceStore; - constructor() { - reaction(() => this.activeApis, () => this.connect(), { - fireImmediately: true, - delay: 500, - }); + private requestId = 0; + private isConnected = false; + private reader: ReadableStreamReader; + private subscribers = observable.map(); + + // events + public onMessage = new EventEmitter<[IKubeWatchMessage]>(); + + @computed get isActive(): boolean { + return this.apis.length > 0; } - @computed get activeApis() { - return Array.from(this.subscribers.keys()); + @computed get apis(): string[] { + const { cluster, namespaceStore } = this; + const activeApis = Array.from(this.subscribers.keys()); + + return activeApis.map(api => { + if (!cluster.isAllowedResource(api.kind)) { + return []; + } + + if (api.isNamespaced) { + return namespaceStore.getContextNamespaces().map(namespace => api.getWatchUrl(namespace)); + } else { + return api.getWatchUrl(); + } + }).flat(); + } + + constructor() { + this.init(); + } + + private async init() { + const { getHostedCluster } = await import("../../common/cluster-store"); + const { namespaceStore } = await import("../components/+namespaces/namespace.store"); + + await namespaceStore.whenReady; + + this.cluster = getHostedCluster(); + this.namespaceStore = namespaceStore; + this.bindAutoConnect(); + } + + private bindAutoConnect() { + const connect = debounce(() => this.connect(), 1000); + + reaction(() => this.apis, connect, { + fireImmediately: true, + equals: comparer.structural, + }); + + window.addEventListener("online", () => this.connect()); + window.addEventListener("offline", () => this.disconnect()); + setInterval(() => this.connectionCheck(), 60000 * 5); // every 5m } getSubscribersCount(api: KubeApi) { return this.subscribers.get(api) || 0; } - subscribe(...apis: KubeApi[]) { + isAllowedApi(api: KubeApi): boolean { + return !!this?.cluster.isAllowedResource(api.kind); + } + + subscribeApi(api: KubeApi | KubeApi[]): () => void { + const apis: KubeApi[] = [api].flat(); + apis.forEach(api => { + if (!this.isAllowedApi(api)) return; // skip this.subscribers.set(api, this.getSubscribersCount(api) + 1); }); - return () => apis.forEach(api => { - const count = this.getSubscribersCount(api) - 1; + return () => { + apis.forEach(api => { + const count = this.getSubscribersCount(api) - 1; - if (count <= 0) this.subscribers.delete(api); - else this.subscribers.set(api, count); - }); - } - - // FIXME: use POST to send apis for subscribing (list could be huge) - // TODO: try to use normal fetch res.body stream to consume watch-api updates - // https://github.com/lensapp/lens/issues/1898 - protected async getQuery() { - const { namespaceStore } = await import("../components/+namespaces/namespace.store"); - - await namespaceStore.whenReady; - const { isAdmin } = getHostedCluster(); - - return { - api: this.activeApis.map(api => { - if (isAdmin && !api.isNamespaced) { - return api.getWatchUrl(); - } - - if (api.isNamespaced) { - return namespaceStore.getContextNamespaces().map(namespace => api.getWatchUrl(namespace)); - } - - return []; - }).flat() + if (count <= 0) this.subscribers.delete(api); + else this.subscribers.set(api, count); + }); }; } - // todo: maybe switch to websocket to avoid often reconnects - @autobind() - protected async connect() { - if (this.evtSource) this.disconnect(); // close previous connection + subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): () => void { + const { preload = true, waitUntilLoaded = true, cacheLoading = false } = options; + const limitRequests = plimit(1); // load stores one by one to allow quick skipping when fast clicking btw pages + const preloading: Promise[] = []; + const apis = new Set(stores.map(store => store.getSubscribeApis()).flat()); + const unsubscribeList: (() => void)[] = []; + let isUnsubscribed = false; - const query = await this.getQuery(); + const subscribe = () => { + if (isUnsubscribed) return; + apis.forEach(api => unsubscribeList.push(this.subscribeApi(api))); + }; + + if (preload) { + for (const store of stores) { + preloading.push(limitRequests(async () => { + if (cacheLoading && store.isLoaded) return; // skip + + return store.loadAll(); + })); + } + } + + if (waitUntilLoaded) { + Promise.all(preloading).then(subscribe, error => { + this.log({ + message: new Error("Loading stores has failed"), + meta: { stores, error, options }, + }); + }); + } else { + subscribe(); + } + + // unsubscribe + return () => { + if (isUnsubscribed) return; + isUnsubscribed = true; + limitRequests.clearQueue(); + unsubscribeList.forEach(unsubscribe => unsubscribe()); + }; + } + + protected async connectionCheck() { + if (!this.isConnected) { + this.log({ message: "Offline: reconnecting.." }); + await this.connect(); + } + + this.log({ + message: `Connection check: ${this.isConnected ? "online" : "offline"}`, + meta: { connected: this.isConnected }, + }); + } + + protected async connect(apis = this.apis) { + this.disconnect(); // close active connections first + + if (!navigator.onLine || !apis.length) { + this.isConnected = false; - if (!this.activeApis.length || !query.api.length) { return; } - const apiUrl = `${apiPrefix}/watch?${stringify(query)}`; + this.log({ + message: "Connecting", + meta: { apis } + }); - this.evtSource = new EventSource(apiUrl); - this.evtSource.onmessage = this.onMessage; - this.evtSource.onerror = this.onError; - this.writeLog("CONNECTING", query.api); - } + try { + const requestId = ++this.requestId; + const abortController = new AbortController(); - reconnect() { - if (!this.evtSource || this.evtSource.readyState !== EventSource.OPEN) { - this.reconnectAttempts = this.maxReconnectsOnError; - this.connect(); + const request = await fetch(`${apiPrefix}/watch`, { + method: "POST", + body: JSON.stringify({ apis } as IWatchRoutePayload), + signal: abortController.signal, + headers: { + "content-type": "application/json" + } + }); + + // request above is stale since new request-id has been issued + if (this.requestId !== requestId) { + abortController.abort(); + + return; + } + + let jsonBuffer = ""; + const stream = request.body.pipeThrough(new TextDecoderStream()); + const reader = stream.getReader(); + + this.isConnected = true; + this.reader = reader; + + while (true) { + const { done, value } = await reader.read(); + + if (done) break; // exit + + const events = (jsonBuffer + value).split("\n"); + + jsonBuffer = this.processBuffer(events); + } + } catch (error) { + this.log({ message: error }); + } finally { + this.isConnected = false; } } protected disconnect() { - if (!this.evtSource) return; - this.evtSource.close(); - this.evtSource.onmessage = null; - this.evtSource = null; + this.reader?.cancel(); + this.reader = null; + this.isConnected = false; } - protected onMessage(evt: MessageEvent) { - if (!evt.data) return; - const data = JSON.parse(evt.data); + // process received stream events, returns unprocessed buffer chunk if any + protected processBuffer(events: string[]): string { + for (const json of events) { + try { + const kubeEvent: IKubeWatchEvent = JSON.parse(json); + const message = this.getMessage(kubeEvent); - if ((data as IKubeWatchEvent).object) { - this.onData.emit(data); - } else { - this.onRouteEvent(data); + this.onMessage.emit(message); + } catch (error) { + return json; + } } + + return ""; } - protected async onRouteEvent(event: IKubeWatchRouteEvent) { - if (event.type === "STREAM_END") { - this.disconnect(); - const { apiBase, namespace } = KubeApi.parseApi(event.url); - const api = apiManager.getApi(apiBase); + protected getMessage(event: IKubeWatchEvent): IKubeWatchMessage { + const message: IKubeWatchMessage = {}; - if (api) { - try { - await api.refreshResourceVersion({ namespace }); - this.reconnect(); - } catch (error) { - console.error("failed to refresh resource version", error); + switch (event.type) { + case "ADDED": + case "DELETED": - if (this.subscribers.size > 0) { - setTimeout(() => { - this.onRouteEvent(event); - }, 1000); - } + case "MODIFIED": { + const data = event as IKubeWatchEvent; + const api = apiManager.getApiByKind(data.object.kind, data.object.apiVersion); + + message.data = data; + + if (api) { + ensureObjectSelfLink(api, data.object); + + const { namespace, resourceVersion } = data.object.metadata; + + api.setResourceVersion(namespace, resourceVersion); + api.setResourceVersion("", resourceVersion); + + message.api = api; + message.store = apiManager.getStore(api); } + break; + } + + case "ERROR": + message.error = event as IKubeWatchEvent; + break; + + case "STREAM_END": { + this.onServerStreamEnd(event as IKubeWatchEventStreamEnd, { + reconnectAttempts: 5, + timeout: 1000, + }); + break; + } + } + + return message; + } + + protected async onServerStreamEnd(event: IKubeWatchEventStreamEnd, opts?: IKubeWatchReconnectOptions) { + const { apiBase, namespace } = parseKubeApi(event.url); + const api = apiManager.getApi(apiBase); + + if (!api) return; + + try { + await api.refreshResourceVersion({ namespace }); + this.connect(); + } catch (error) { + this.log({ + message: new Error(`Failed to connect on single stream end: ${error}`), + meta: { event, error }, + }); + + if (this.isActive && opts?.reconnectAttempts > 0) { + opts.reconnectAttempts--; + setTimeout(() => this.onServerStreamEnd(event, opts), opts.timeout); // repeat event } } } - protected onError(evt: MessageEvent) { - const { reconnectAttempts: attemptsRemain, reconnectTimeoutMs } = this; - - if (evt.eventPhase === EventSource.CLOSED) { - if (attemptsRemain > 0) { - this.reconnectAttempts--; - setTimeout(() => this.connect(), reconnectTimeoutMs); - } + protected log({ message, meta = {} }: IKubeWatchLog) { + if (isProduction && !isDebugging) { + return; } - } - protected writeLog(...data: any[]) { - if (isDevelopment) { - console.log("%cKUBE-WATCH-API:", `font-weight: bold`, ...data); + const logMessage = `%c[KUBE-WATCH-API]: ${String(message).toUpperCase()}`; + const isError = message instanceof Error; + const textStyle = `font-weight: bold;`; + const time = new Date().toLocaleString(); + + if (isError) { + console.error(logMessage, textStyle, { time, ...meta }); + } else { + console.info(logMessage, textStyle, { time, ...meta }); } } - - addListener(store: KubeObjectStore, callback: (evt: IKubeWatchEvent) => void) { - const listener = (evt: IKubeWatchEvent) => { - if (evt.type === "ERROR") { - return; // e.g. evt.object.message == "too old resource version" - } - - const { namespace, resourceVersion } = evt.object.metadata; - const api = apiManager.getApiByKind(evt.object.kind, evt.object.apiVersion); - - api.setResourceVersion(namespace, resourceVersion); - api.setResourceVersion("", resourceVersion); - - ensureObjectSelfLink(api, evt.object); - - if (store == apiManager.getStore(api)) { - callback(evt); - } - }; - - this.onData.addListener(listener); - - return () => this.onData.removeListener(listener); - } - - reset() { - this.subscribers.clear(); - } } export const kubeWatchApi = new KubeWatchApi(); diff --git a/src/renderer/components/+cluster/cluster-overview.tsx b/src/renderer/components/+cluster/cluster-overview.tsx index 104c6fd022..1f7a7f6b78 100644 --- a/src/renderer/components/+cluster/cluster-overview.tsx +++ b/src/renderer/components/+cluster/cluster-overview.tsx @@ -3,13 +3,9 @@ import "./cluster-overview.scss"; import React from "react"; import { reaction } from "mobx"; import { disposeOnUnmount, observer } from "mobx-react"; - -import { eventStore } from "../+events/event.store"; import { nodesStore } from "../+nodes/nodes.store"; import { podsStore } from "../+workloads-pods/pods.store"; import { getHostedCluster } from "../../../common/cluster-store"; -import { isAllowedResource } from "../../../common/rbac"; -import { KubeObjectStore } from "../../kube-object.store"; import { interval } from "../../utils"; import { TabLayout } from "../layout/tab-layout"; import { Spinner } from "../spinner"; @@ -17,45 +13,33 @@ import { ClusterIssues } from "./cluster-issues"; import { ClusterMetrics } from "./cluster-metrics"; import { clusterOverviewStore } from "./cluster-overview.store"; import { ClusterPieCharts } from "./cluster-pie-charts"; +import { eventStore } from "../+events/event.store"; +import { kubeWatchApi } from "../../api/kube-watch-api"; @observer export class ClusterOverview extends React.Component { - private stores: KubeObjectStore[] = []; - private subscribers: Array<() => void> = []; - private metricPoller = interval(60, this.loadMetrics); - - @disposeOnUnmount - fetchMetrics = reaction( - () => clusterOverviewStore.metricNodeRole, // Toggle Master/Worker node switcher - () => this.metricPoller.restart(true) - ); + private metricPoller = interval(60, () => this.loadMetrics()); loadMetrics() { getHostedCluster().available && clusterOverviewStore.loadMetrics(); } - async componentDidMount() { - if (isAllowedResource("nodes")) { - this.stores.push(nodesStore); - } + componentDidMount() { + this.metricPoller.start(true); - if (isAllowedResource("pods")) { - this.stores.push(podsStore); - } + disposeOnUnmount(this, [ + kubeWatchApi.subscribeStores([nodesStore, podsStore, eventStore], { + preload: true, + }), - if (isAllowedResource("events")) { - this.stores.push(eventStore); - } - - await Promise.all(this.stores.map(store => store.loadAll())); - this.loadMetrics(); - - this.subscribers = this.stores.map(store => store.subscribe()); - this.metricPoller.start(); + reaction( + () => clusterOverviewStore.metricNodeRole, // Toggle Master/Worker node switcher + () => this.metricPoller.restart(true) + ), + ]); } componentWillUnmount() { - this.subscribers.forEach(dispose => dispose()); // unsubscribe all this.metricPoller.stop(); } diff --git a/src/renderer/components/+events/event.store.ts b/src/renderer/components/+events/event.store.ts index 3651ce1549..d6090be947 100644 --- a/src/renderer/components/+events/event.store.ts +++ b/src/renderer/components/+events/event.store.ts @@ -52,6 +52,10 @@ export class EventStore extends KubeObjectStore { return compact(eventsWithError); } + + getWarningsCount() { + return this.getWarnings().length; + } } export const eventStore = new EventStore(); diff --git a/src/renderer/components/+namespaces/namespace-select.tsx b/src/renderer/components/+namespaces/namespace-select.tsx index 079a9cd0b6..6ee7ea2d57 100644 --- a/src/renderer/components/+namespaces/namespace-select.tsx +++ b/src/renderer/components/+namespaces/namespace-select.tsx @@ -2,13 +2,14 @@ import "./namespace-select.scss"; import React from "react"; import { computed } from "mobx"; -import { observer } from "mobx-react"; +import { disposeOnUnmount, observer } from "mobx-react"; import { Select, SelectOption, SelectProps } from "../select"; -import { cssNames, noop } from "../../utils"; +import { cssNames } from "../../utils"; import { Icon } from "../icon"; import { namespaceStore } from "./namespace.store"; import { FilterIcon } from "../item-object-list/filter-icon"; import { FilterType } from "../item-object-list/page-filters.store"; +import { kubeWatchApi } from "../../api/kube-watch-api"; interface Props extends SelectProps { showIcons?: boolean; @@ -28,17 +29,13 @@ const defaultProps: Partial = { @observer export class NamespaceSelect extends React.Component { static defaultProps = defaultProps as object; - private unsubscribe = noop; - async componentDidMount() { - if (!namespaceStore.isLoaded) { - await namespaceStore.loadAll(); - } - this.unsubscribe = namespaceStore.subscribe(); - } - - componentWillUnmount() { - this.unsubscribe(); + componentDidMount() { + disposeOnUnmount(this, [ + kubeWatchApi.subscribeStores([namespaceStore], { + preload: true, + }) + ]); } @computed get options(): SelectOption[] { @@ -60,7 +57,7 @@ export class NamespaceSelect extends React.Component { return label || ( <> - {showIcons && } + {showIcons && } {value} ); @@ -103,9 +100,9 @@ export class NamespaceSelectFilter extends React.Component { return (
- + {namespace} - {isSelected && } + {isSelected && }
); }} diff --git a/src/renderer/components/+namespaces/namespace.store.ts b/src/renderer/components/+namespaces/namespace.store.ts index 50ec2c8038..63bb7525de 100644 --- a/src/renderer/components/+namespaces/namespace.store.ts +++ b/src/renderer/components/+namespaces/namespace.store.ts @@ -117,15 +117,15 @@ export class NamespaceStore extends KubeObjectStore { return namespaces; } - subscribe(apis = [this.api]) { + getSubscribeApis() { const { accessibleNamespaces } = getHostedCluster(); // if user has given static list of namespaces let's not start watches because watch adds stuff that's not wanted if (accessibleNamespaces.length > 0) { - return Function; // no-op + return []; } - return super.subscribe(apis); + return super.getSubscribeApis(); } protected async loadItems(params: KubeObjectStoreLoadingParams) { diff --git a/src/renderer/components/+network-services/service-details.tsx b/src/renderer/components/+network-services/service-details.tsx index 58cbe0a86e..80c9bd4dc7 100644 --- a/src/renderer/components/+network-services/service-details.tsx +++ b/src/renderer/components/+network-services/service-details.tsx @@ -1,17 +1,18 @@ import "./service-details.scss"; import React from "react"; -import { observer } from "mobx-react"; +import { disposeOnUnmount, observer } from "mobx-react"; import { DrawerItem, DrawerTitle } from "../drawer"; import { Badge } from "../badge"; import { KubeEventDetails } from "../+events/kube-event-details"; import { KubeObjectDetailsProps } from "../kube-object"; -import { Service, endpointApi } from "../../api/endpoints"; +import { Service } from "../../api/endpoints"; import { KubeObjectMeta } from "../kube-object/kube-object-meta"; import { ServicePortComponent } from "./service-port-component"; import { endpointStore } from "../+network-endpoints/endpoints.store"; import { ServiceDetailsEndpoint } from "./service-details-endpoint"; import { kubeObjectDetailRegistry } from "../../api/kube-object-detail-registry"; +import { kubeWatchApi } from "../../api/kube-watch-api"; interface Props extends KubeObjectDetailsProps { } @@ -19,10 +20,11 @@ interface Props extends KubeObjectDetailsProps { @observer export class ServiceDetails extends React.Component { componentDidMount() { - if (!endpointStore.isLoaded) { - endpointStore.loadAll(); - } - endpointApi.watch(); + disposeOnUnmount(this, [ + kubeWatchApi.subscribeStores([endpointStore], { + preload: true, + }), + ]); } render() { @@ -77,7 +79,7 @@ export class ServiceDetails extends React.Component { )} - + ); } diff --git a/src/renderer/components/+nodes/nodes.store.ts b/src/renderer/components/+nodes/nodes.store.ts index c0385b078b..b301015747 100644 --- a/src/renderer/components/+nodes/nodes.store.ts +++ b/src/renderer/components/+nodes/nodes.store.ts @@ -1,3 +1,4 @@ +import { sum } from "lodash"; import { action, computed, observable } from "mobx"; import { clusterApi, IClusterMetrics, INodeMetrics, Node, nodesApi } from "../../api/endpoints"; import { autobind } from "../../utils"; @@ -62,6 +63,10 @@ export class NodesStore extends KubeObjectStore { }); } + getWarningsCount(): number { + return sum(this.items.map((node: Node) => node.getWarningConditions().length)); + } + reset() { super.reset(); this.metrics = {}; diff --git a/src/renderer/components/+user-management-roles-bindings/role-bindings.store.ts b/src/renderer/components/+user-management-roles-bindings/role-bindings.store.ts index 71890acc44..620fbd86ac 100644 --- a/src/renderer/components/+user-management-roles-bindings/role-bindings.store.ts +++ b/src/renderer/components/+user-management-roles-bindings/role-bindings.store.ts @@ -9,8 +9,8 @@ import { apiManager } from "../../api/api-manager"; export class RoleBindingsStore extends KubeObjectStore { api = clusterRoleBindingApi; - subscribe() { - return super.subscribe([clusterRoleBindingApi, roleBindingApi]); + getSubscribeApis() { + return [clusterRoleBindingApi, roleBindingApi]; } protected sortItems(items: RoleBinding[]) { diff --git a/src/renderer/components/+user-management-roles/roles.store.ts b/src/renderer/components/+user-management-roles/roles.store.ts index 7d2e90dd38..82b0e66612 100644 --- a/src/renderer/components/+user-management-roles/roles.store.ts +++ b/src/renderer/components/+user-management-roles/roles.store.ts @@ -7,8 +7,8 @@ import { apiManager } from "../../api/api-manager"; export class RolesStore extends KubeObjectStore { api = clusterRoleApi; - subscribe() { - return super.subscribe([roleApi, clusterRoleApi]); + getSubscribeApis() { + return [roleApi, clusterRoleApi]; } protected sortItems(items: Role[]) { diff --git a/src/renderer/components/+workloads-overview/overview.tsx b/src/renderer/components/+workloads-overview/overview.tsx index 351b57462c..50a25ef87c 100644 --- a/src/renderer/components/+workloads-overview/overview.tsx +++ b/src/renderer/components/+workloads-overview/overview.tsx @@ -1,8 +1,7 @@ import "./overview.scss"; import React from "react"; -import { observable, when } from "mobx"; -import { observer } from "mobx-react"; +import { disposeOnUnmount, observer } from "mobx-react"; import { OverviewStatuses } from "./overview-statuses"; import { RouteComponentProps } from "react-router"; import { IWorkloadsOverviewRouteParams } from "../+workloads"; @@ -15,60 +14,23 @@ import { replicaSetStore } from "../+workloads-replicasets/replicasets.store"; import { jobStore } from "../+workloads-jobs/job.store"; import { cronJobStore } from "../+workloads-cronjobs/cronjob.store"; import { Events } from "../+events"; -import { KubeObjectStore } from "../../kube-object.store"; import { isAllowedResource } from "../../../common/rbac"; -import { namespaceStore } from "../+namespaces/namespace.store"; +import { kubeWatchApi } from "../../api/kube-watch-api"; interface Props extends RouteComponentProps { } @observer export class WorkloadsOverview extends React.Component { - @observable isLoading = false; - @observable isUnmounting = false; - - async componentDidMount() { - const stores: KubeObjectStore[] = [ - isAllowedResource("pods") && podsStore, - isAllowedResource("deployments") && deploymentStore, - isAllowedResource("daemonsets") && daemonSetStore, - isAllowedResource("statefulsets") && statefulSetStore, - isAllowedResource("replicasets") && replicaSetStore, - isAllowedResource("jobs") && jobStore, - isAllowedResource("cronjobs") && cronJobStore, - isAllowedResource("events") && eventStore, - ].filter(Boolean); - - const unsubscribeMap = new Map void>(); - - const loadStores = async () => { - this.isLoading = true; - - for (const store of stores) { - if (this.isUnmounting) break; - - try { - await store.loadAll(); - unsubscribeMap.get(store)?.(); // unsubscribe previous watcher - unsubscribeMap.set(store, store.subscribe()); - } catch (error) { - console.error("loading store error", error); - } - } - this.isLoading = false; - }; - - namespaceStore.onContextChange(loadStores, { - fireImmediately: true, - }); - - await when(() => this.isUnmounting && !this.isLoading); - unsubscribeMap.forEach(dispose => dispose()); - unsubscribeMap.clear(); - } - - componentWillUnmount() { - this.isUnmounting = true; + componentDidMount() { + disposeOnUnmount(this, [ + kubeWatchApi.subscribeStores([ + podsStore, deploymentStore, daemonSetStore, statefulSetStore, replicaSetStore, + jobStore, cronJobStore, eventStore, + ], { + preload: true, + }), + ]); } render() { diff --git a/src/renderer/components/app.tsx b/src/renderer/components/app.tsx index 958ab4b73d..767a905e4a 100755 --- a/src/renderer/components/app.tsx +++ b/src/renderer/components/app.tsx @@ -1,5 +1,5 @@ import React from "react"; -import { observer } from "mobx-react"; +import { disposeOnUnmount, observer } from "mobx-react"; import { Redirect, Route, Router, Switch } from "react-router"; import { history } from "../navigation"; import { Notifications } from "./notifications"; @@ -42,10 +42,10 @@ import { ClusterPageMenuRegistration, clusterPageMenuRegistry } from "../../exte import { TabLayout, TabLayoutRoute } from "./layout/tab-layout"; import { StatefulSetScaleDialog } from "./+workloads-statefulsets/statefulset-scale-dialog"; import { eventStore } from "./+events/event.store"; -import { reaction, computed, observable } from "mobx"; +import { computed, reaction, observable } from "mobx"; import { nodesStore } from "./+nodes/nodes.store"; import { podsStore } from "./+workloads-pods/pods.store"; -import { sum } from "lodash"; +import { kubeWatchApi } from "../api/kube-watch-api"; import { ReplicaSetScaleDialog } from "./+workloads-replicasets/replicaset-scale-dialog"; @observer @@ -75,50 +75,26 @@ export class App extends React.Component { whatInput.ask(); // Start to monitor user input device } - @observable extensionRoutes: Map = new Map(); + componentDidMount() { + disposeOnUnmount(this, [ + kubeWatchApi.subscribeStores([podsStore, nodesStore, eventStore], { + preload: true, + }), - async componentDidMount() { - const cluster = getHostedCluster(); - const promises: Promise[] = []; + reaction(() => this.warningsTotal, (count: number) => { + broadcastMessage(`cluster-warning-event-count:${getHostedCluster().id}`, count); + }), - if (isAllowedResource("events") && isAllowedResource("pods")) { - promises.push(eventStore.loadAll()); - promises.push(podsStore.loadAll()); - } - - if (isAllowedResource("nodes")) { - promises.push(nodesStore.loadAll()); - } - await Promise.all(promises); - - if (eventStore.isLoaded && podsStore.isLoaded) { - eventStore.subscribe(); - podsStore.subscribe(); - } - - if (nodesStore.isLoaded) { - nodesStore.subscribe(); - } - - reaction(() => this.warningsCount, (count) => { - broadcastMessage(`cluster-warning-event-count:${cluster.id}`, count); - }); - - reaction(() => clusterPageMenuRegistry.getRootItems(), (rootItems) => { - this.generateExtensionTabLayoutRoutes(rootItems); - }, { - fireImmediately: true - }); + reaction(() => clusterPageMenuRegistry.getRootItems(), (rootItems) => { + this.generateExtensionTabLayoutRoutes(rootItems); + }, { + fireImmediately: true + }) + ]); } - @computed - get warningsCount() { - let warnings = sum(nodesStore.items - .map(node => node.getWarningConditions().length)); - - warnings = warnings + eventStore.getWarnings().length; - - return warnings; + @computed get warningsTotal(): number { + return nodesStore.getWarningsCount() + eventStore.getWarningsCount(); } get startURL() { @@ -151,6 +127,26 @@ export class App extends React.Component { return routes; } + renderExtensionTabLayoutRoutes() { + return clusterPageMenuRegistry.getRootItems().map((menu, index) => { + const tabRoutes = this.getTabLayoutRoutes(menu); + + if (tabRoutes.length > 0) { + const pageComponent = () => ; + + return tab.routePath)}/>; + } else { + const page = clusterPageRegistry.getByPageTarget(menu.target); + + if (page) { + return ; + } + } + }); + } + + @observable extensionRoutes: Map = new Map(); + generateExtensionTabLayoutRoutes(rootItems: ClusterPageMenuRegistration[]) { rootItems.forEach((menu, index) => { let route = this.extensionRoutes.get(menu); @@ -181,10 +177,6 @@ export class App extends React.Component { } } - renderExtensionTabLayoutRoutes() { - return Array.from(this.extensionRoutes.values()); - } - renderExtensionRoutes() { return clusterPageRegistry.getItems().map((page, index) => { const menu = clusterPageMenuRegistry.getByPage(page); diff --git a/src/renderer/components/item-object-list/item-list-layout.tsx b/src/renderer/components/item-object-list/item-list-layout.tsx index 6b4ff4fd16..b13d496064 100644 --- a/src/renderer/components/item-object-list/item-list-layout.tsx +++ b/src/renderer/components/item-object-list/item-list-layout.tsx @@ -2,7 +2,7 @@ import "./item-list-layout.scss"; import groupBy from "lodash/groupBy"; import React, { ReactNode } from "react"; -import { computed, IReactionDisposer, observable, reaction, toJS } from "mobx"; +import { computed, observable, reaction, toJS } from "mobx"; import { disposeOnUnmount, observer } from "mobx-react"; import { ConfirmDialog, ConfirmDialogParams } from "../confirm-dialog"; import { Table, TableCell, TableCellProps, TableHead, TableProps, TableRow, TableRowProps, TableSortCallback } from "../table"; @@ -12,7 +12,6 @@ import { NoItems } from "../no-items"; import { Spinner } from "../spinner"; import { ItemObject, ItemStore } from "../../item.store"; import { SearchInputUrl } from "../input"; -import { namespaceStore } from "../+namespaces/namespace.store"; import { Filter, FilterType, pageFilters } from "./page-filters.store"; import { PageFiltersList } from "./page-filters-list"; import { PageFiltersSelect } from "./page-filters-select"; @@ -22,6 +21,7 @@ import { MenuActions } from "../menu/menu-actions"; import { MenuItem } from "../menu"; import { Checkbox } from "../checkbox"; import { userStore } from "../../../common/user-store"; +import { namespaceStore } from "../+namespaces/namespace.store"; // todo: refactor, split to small re-usable components @@ -40,6 +40,7 @@ export interface ItemListLayoutProps { className: IClassName; store: ItemStore; dependentStores?: ItemStore[]; + preloadStores?: boolean; isClusterScoped?: boolean; hideFilters?: boolean; searchFilters?: SearchFilter[]; @@ -82,6 +83,7 @@ const defaultProps: Partial = { isSelectable: true, isConfigurable: false, copyClassNameFromHeadCells: true, + preloadStores: true, dependentStores: [], filterItems: [], hasDetailsView: true, @@ -97,10 +99,6 @@ interface ItemListLayoutUserSettings { export class ItemListLayout extends React.Component { static defaultProps = defaultProps as object; - private watchDisposers: IReactionDisposer[] = []; - - @observable isUnmounting = false; - @observable userSettings: ItemListLayoutUserSettings = { showAppliedFilters: false, }; @@ -119,54 +117,28 @@ export class ItemListLayout extends React.Component { } async componentDidMount() { - const { isClusterScoped, isConfigurable, tableId } = this.props; + const { isClusterScoped, isConfigurable, tableId, preloadStores } = this.props; if (isConfigurable && !tableId) { throw new Error("[ItemListLayout]: configurable list require props.tableId to be specified"); } - this.loadStores(); + if (preloadStores) { + this.loadStores(); - if (!isClusterScoped) { - disposeOnUnmount(this, [ - namespaceStore.onContextChange(() => this.loadStores()) - ]); + if (!isClusterScoped) { + disposeOnUnmount(this, [ + namespaceStore.onContextChange(() => this.loadStores()) + ]); + } } } - async componentWillUnmount() { - this.isUnmounting = true; - this.unsubscribeStores(); - } - - @computed get stores() { + private loadStores() { const { store, dependentStores } = this.props; + const stores = Array.from(new Set([store, ...dependentStores])); - return new Set([store, ...dependentStores]); - } - - async loadStores() { - this.unsubscribeStores(); // reset first - - // load - for (const store of this.stores) { - if (this.isUnmounting) { - this.unsubscribeStores(); - break; - } - - try { - await store.loadAll(); - this.watchDisposers.push(store.subscribe()); - } catch (error) { - console.error("loading store error", error); - } - } - } - - unsubscribeStores() { - this.watchDisposers.forEach(dispose => dispose()); - this.watchDisposers.length = 0; + stores.forEach(store => store.loadAll()); } private filterCallbacks: { [type: string]: ItemsFilter } = { diff --git a/src/renderer/components/kube-object/kube-object-list-layout.tsx b/src/renderer/components/kube-object/kube-object-list-layout.tsx index 25922f0f72..226023fc8d 100644 --- a/src/renderer/components/kube-object/kube-object-list-layout.tsx +++ b/src/renderer/components/kube-object/kube-object-list-layout.tsx @@ -1,15 +1,17 @@ import React from "react"; import { computed } from "mobx"; -import { observer } from "mobx-react"; +import { disposeOnUnmount, observer } from "mobx-react"; import { cssNames } from "../../utils"; import { KubeObject } from "../../api/kube-object"; import { ItemListLayout, ItemListLayoutProps } from "../item-object-list/item-list-layout"; import { KubeObjectStore } from "../../kube-object.store"; import { KubeObjectMenu } from "./kube-object-menu"; import { kubeSelectedUrlParam, showDetails } from "./kube-object-details"; +import { kubeWatchApi } from "../../api/kube-watch-api"; export interface KubeObjectListLayoutProps extends ItemListLayoutProps { store: KubeObjectStore; + dependentStores?: KubeObjectStore[]; } @observer @@ -18,6 +20,17 @@ export class KubeObjectListLayout extends React.Component { if (this.props.onDetails) { this.props.onDetails(item); @@ -33,6 +46,7 @@ export class KubeObjectListLayout extends React.Component { diff --git a/src/renderer/kube-object.store.ts b/src/renderer/kube-object.store.ts index 8a75bc7ae6..760ebd3335 100644 --- a/src/renderer/kube-object.store.ts +++ b/src/renderer/kube-object.store.ts @@ -2,10 +2,10 @@ import type { Cluster } from "../main/cluster"; import { action, observable, reaction } from "mobx"; import { autobind } from "./utils"; import { KubeObject } from "./api/kube-object"; -import { IKubeWatchEvent, kubeWatchApi } from "./api/kube-watch-api"; +import { IKubeWatchEvent, IKubeWatchMessage, kubeWatchApi } from "./api/kube-watch-api"; import { ItemStore } from "./item.store"; import { apiManager } from "./api/api-manager"; -import { IKubeApiQueryParams, KubeApi } from "./api/kube-api"; +import { IKubeApiQueryParams, KubeApi, parseKubeApi } from "./api/kube-api"; import { KubeJsonApiData } from "./api/kube-json-api"; export interface KubeObjectStoreLoadingParams { @@ -22,7 +22,6 @@ export abstract class KubeObjectStore extends ItemSt constructor() { super(); this.bindWatchEventsUpdater(); - kubeWatchApi.addListener(this, this.onWatchApiEvent); } get query(): IKubeApiQueryParams { @@ -157,7 +156,7 @@ export abstract class KubeObjectStore extends ItemSt @action async loadFromPath(resourcePath: string) { - const { namespace, name } = KubeApi.parseApi(resourcePath); + const { namespace, name } = parseKubeApi(resourcePath); return this.load({ name, namespace }); } @@ -195,29 +194,29 @@ export abstract class KubeObjectStore extends ItemSt } // collect items from watch-api events to avoid UI blowing up with huge streams of data - protected eventsBuffer = observable>([], { deep: false }); + protected eventsBuffer = observable.array>([], { deep: false }); protected bindWatchEventsUpdater(delay = 1000) { - return reaction(() => this.eventsBuffer.toJS()[0], this.updateFromEventsBuffer, { + kubeWatchApi.onMessage.addListener(({ store, data }: IKubeWatchMessage) => { + if (!this.isLoaded || store !== this) return; + this.eventsBuffer.push(data); + }); + + reaction(() => this.eventsBuffer.length > 0, this.updateFromEventsBuffer, { delay }); } - subscribe(apis = [this.api]) { - return KubeApi.watchAll(...apis); + getSubscribeApis(): KubeApi[] { + return [this.api]; } - protected onWatchApiEvent(evt: IKubeWatchEvent) { - if (!this.isLoaded) return; - this.eventsBuffer.push(evt); + subscribe(apis = this.getSubscribeApis()) { + return kubeWatchApi.subscribeApi(apis); } @action protected updateFromEventsBuffer() { - if (!this.eventsBuffer.length) { - return; - } - // create latest non-observable copy of items to apply updates in one action (==single render) const items = this.items.toJS(); for (const { type, object } of this.eventsBuffer.clear()) {