From 701259695c3f2955c223d98477bd3383642df3ed Mon Sep 17 00:00:00 2001 From: Jari Kolehmainen Date: Mon, 8 Feb 2021 16:51:02 +0200 Subject: [PATCH] use native k8s api watches Signed-off-by: Jari Kolehmainen --- package.json | 3 + src/main/router.ts | 5 +- src/main/routes/index.ts | 1 - src/main/routes/watch-route.ts | 162 ----------- src/renderer/api/json-api.ts | 14 + src/renderer/api/kube-api.ts | 88 +++++- src/renderer/api/kube-watch-api.ts | 264 +----------------- .../+workloads-overview/overview-statuses.tsx | 2 - .../item-object-list/item-list-layout.tsx | 10 +- .../item-object-list/page-filters.store.ts | 28 -- src/renderer/item.store.ts | 4 + src/renderer/kube-object.store.ts | 40 ++- yarn.lock | 33 ++- 13 files changed, 183 insertions(+), 471 deletions(-) delete mode 100644 src/main/routes/watch-route.ts diff --git a/package.json b/package.json index 6d42bd99be..b0cfd36645 100644 --- a/package.json +++ b/package.json @@ -189,6 +189,7 @@ "@kubernetes/client-node": "^0.12.0", "array-move": "^3.0.0", "await-lock": "^2.1.0", + "byline": "^5.0.0", "chalk": "^4.1.0", "chokidar": "^3.4.3", "command-exists": "1.2.9", @@ -221,6 +222,7 @@ "react": "^17.0.1", "react-dom": "^17.0.1", "react-router": "^5.2.0", + "readable-web-to-node-stream": "^3.0.1", "request": "^2.88.2", "request-promise-native": "^1.0.8", "semver": "^7.3.2", @@ -242,6 +244,7 @@ "@pmmmwh/react-refresh-webpack-plugin": "^0.4.3", "@testing-library/jest-dom": "^5.11.5", "@testing-library/react": "^11.1.0", + "@types/byline": "^4.2.32", "@types/chart.js": "^2.9.21", "@types/circular-dependency-plugin": "^5.0.1", "@types/color": "^3.0.1", diff --git a/src/main/router.ts b/src/main/router.ts index 875bd319b5..bb49aacdab 100644 --- a/src/main/router.ts +++ b/src/main/router.ts @@ -5,7 +5,7 @@ import path from "path"; import { readFile } from "fs-extra"; import { Cluster } from "./cluster"; import { apiPrefix, appName, publicPath, isDevelopment, webpackDevServerPort } from "../common/vars"; -import { helmRoute, kubeconfigRoute, metricsRoute, portForwardRoute, resourceApplierRoute, watchRoute, versionRoute } from "./routes"; +import { helmRoute, kubeconfigRoute, metricsRoute, portForwardRoute, resourceApplierRoute, versionRoute } from "./routes"; import logger from "./logger"; export interface RouterRequestOpts { @@ -146,9 +146,6 @@ export class Router { this.router.add({ method: "get", path: "/version"}, versionRoute.getVersion.bind(versionRoute)); this.router.add({ method: "get", path: `${apiPrefix}/kubeconfig/service-account/{namespace}/{account}` }, kubeconfigRoute.routeServiceAccountRoute.bind(kubeconfigRoute)); - // Watch API - this.router.add({ method: "post", path: `${apiPrefix}/watch` }, watchRoute.routeWatch.bind(watchRoute)); - // Metrics API this.router.add({ method: "post", path: `${apiPrefix}/metrics` }, metricsRoute.routeMetrics.bind(metricsRoute)); diff --git a/src/main/routes/index.ts b/src/main/routes/index.ts index c2fd222631..c194d8f8b2 100644 --- a/src/main/routes/index.ts +++ b/src/main/routes/index.ts @@ -1,7 +1,6 @@ export * from "./kubeconfig-route"; export * from "./metrics-route"; export * from "./port-forward-route"; -export * from "./watch-route"; export * from "./helm-route"; export * from "./resource-applier-route"; export * from "./version-route"; diff --git a/src/main/routes/watch-route.ts b/src/main/routes/watch-route.ts deleted file mode 100644 index 2c86314908..0000000000 --- a/src/main/routes/watch-route.ts +++ /dev/null @@ -1,162 +0,0 @@ -import type { KubeJsonApiData, KubeJsonApiError } from "../../renderer/api/kube-json-api"; - -import plimit from "p-limit"; -import { delay } from "../../common/utils"; -import { LensApiRequest } from "../router"; -import { LensApi } from "../lens-api"; -import { KubeConfig, Watch } from "@kubernetes/client-node"; -import { ServerResponse } from "http"; -import { Request } from "request"; -import logger from "../logger"; - -export interface IKubeWatchEvent { - type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR" | "STREAM_END"; - object?: T; -} - -export interface IKubeWatchEventStreamEnd extends IKubeWatchEvent { - type: "STREAM_END"; - url: string; - status: number; -} - -export interface IWatchRoutePayload { - apis: string[]; // kube-api url list for subscribing to watch events -} - -class ApiWatcher { - private apiUrl: string; - private response: ServerResponse; - private watchRequest: Request; - private watch: Watch; - private processor: NodeJS.Timeout; - private eventBuffer: any[] = []; - - constructor(apiUrl: string, kubeConfig: KubeConfig, response: ServerResponse) { - this.apiUrl = apiUrl; - this.watch = new Watch(kubeConfig); - this.response = response; - } - - public async start() { - if (this.processor) { - clearInterval(this.processor); - } - this.processor = setInterval(() => { - if (this.response.finished) return; - const events = this.eventBuffer.splice(0); - - events.map(event => this.sendEvent(event)); - this.response.flushHeaders(); - }, 50); - this.watchRequest = await this.watch.watch(this.apiUrl, {}, this.watchHandler.bind(this), this.doneHandler.bind(this)); - } - - public stop() { - if (!this.watchRequest) { - return; - } - - if (this.processor) { - clearInterval(this.processor); - } - logger.debug(`Stopping watcher for api: ${this.apiUrl}`); - - try { - this.watchRequest.abort(); - - const event: IKubeWatchEventStreamEnd = { - type: "STREAM_END", - url: this.apiUrl, - status: 410, - }; - - this.sendEvent(event); - logger.debug("watch aborted"); - } catch (error) { - logger.error(`Watch abort errored:${error}`); - } - } - - private watchHandler(phase: string, obj: any) { - this.eventBuffer.push({ - type: phase, - object: obj - }); - } - - private doneHandler(error: Error) { - if (error) logger.warn(`watch ended: ${error.toString()}`); - this.watchRequest.abort(); - } - - private sendEvent(evt: IKubeWatchEvent) { - this.response.write(`${JSON.stringify(evt)}\n`); - } -} - -class WatchRoute extends LensApi { - private response: ServerResponse; - - private setResponse(response: ServerResponse) { - // clean up previous connection and stop all corresponding watch-api requests - // otherwise it happens only by request timeout or something else.. - this.response?.destroy(); - this.response = response; - } - - public async routeWatch(request: LensApiRequest) { - const { response, cluster, payload: { apis } = {} } = request; - - if (!apis?.length) { - this.respondJson(response, { - message: "watch apis list is empty" - }, 400); - - return; - } - - this.setResponse(response); - response.setHeader("Content-Type", "application/json"); - response.setHeader("Cache-Control", "no-cache"); - response.setHeader("Connection", "keep-alive"); - logger.debug(`watch using kubeconfig:${JSON.stringify(cluster.getProxyKubeconfig(), null, 2)}`); - - // limit concurrent k8s requests to avoid possible ECONNRESET-error - const requests = plimit(5); - const watchers = new Map(); - let isWatchRequestEnded = false; - - apis.forEach(apiUrl => { - const watcher = new ApiWatcher(apiUrl, cluster.getProxyKubeconfig(), response); - - watchers.set(apiUrl, watcher); - - requests(async () => { - if (isWatchRequestEnded) return; - await watcher.start(); - await delay(100); - }); - }); - - function onRequestEnd() { - if (isWatchRequestEnded) return; - isWatchRequestEnded = true; - requests.clearQueue(); - watchers.forEach(watcher => watcher.stop()); - watchers.clear(); - } - - request.raw.req.on("end", () => { - logger.info("Watch request end"); - onRequestEnd(); - }); - - request.raw.req.on("close", () => { - logger.info("Watch request close"); - onRequestEnd(); - }); - } -} - -export const watchRoute = new WatchRoute(); diff --git a/src/renderer/api/json-api.ts b/src/renderer/api/json-api.ts index 49c2cb1a28..cfd00e9646 100644 --- a/src/renderer/api/json-api.ts +++ b/src/renderer/api/json-api.ts @@ -55,6 +55,20 @@ export class JsonApi { return this.request(path, params, { ...reqInit, method: "get" }); } + getReadableStream(path: string, params?: P, init: RequestInit = {}): Promise { + let reqUrl = this.config.apiBase + path; + const reqInit: RequestInit = { ...init }; + const { query } = params || {} as P; + + if (query) { + const queryString = stringify(query); + + reqUrl += (reqUrl.includes("?") ? "&" : "?") + queryString; + } + + return fetch(reqUrl, reqInit); + } + post(path: string, params?: P, reqInit: RequestInit = {}) { return this.request(path, params, { ...reqInit, method: "post" }); } diff --git a/src/renderer/api/kube-api.ts b/src/renderer/api/kube-api.ts index e62603b14f..c0b6f00884 100644 --- a/src/renderer/api/kube-api.ts +++ b/src/renderer/api/kube-api.ts @@ -9,7 +9,9 @@ import { apiKube } from "./index"; import { createKubeApiURL, parseKubeApi } from "./kube-api-parse"; import { KubeJsonApi, KubeJsonApiData, KubeJsonApiDataList } from "./kube-json-api"; import { IKubeObjectConstructor, KubeObject } from "./kube-object"; -import { kubeWatchApi } from "./kube-watch-api"; +import byline from "byline"; +import { ReadableWebToNodeStream } from "readable-web-to-node-stream"; +import { IKubeWatchEvent, IKubeWatchMessage } from "./kube-watch-api"; export interface IKubeApiOptions { /** @@ -91,6 +93,11 @@ export function ensureObjectSelfLink(api: KubeApi, object: KubeJsonApiData) { } } +type KubeApiWatchOptions = { + namespace: string; + callback?: (data: IKubeWatchEvent) => void; +}; + export class KubeApi { readonly kind: string; readonly apiBase: string; @@ -104,6 +111,7 @@ export class KubeApi { public objectConstructor: IKubeObjectConstructor; protected request: KubeJsonApi; protected resourceVersions = new Map(); + protected watchDisposer: () => void; constructor(protected options: IKubeApiOptions) { const { @@ -357,8 +365,82 @@ export class KubeApi { }); } - watch(): () => void { - return kubeWatchApi.subscribeApi(this); + watch(opts: KubeApiWatchOptions = { namespace: "" }): () => void { + const { namespace, callback } = opts; + const watchUrl = this.getWatchUrl(namespace); + const abortController = new AbortController(); + const responsePromise = this.request.getReadableStream(watchUrl, null, { signal: abortController.signal }); + let disposed = false; + + responsePromise.then((response) => { + const nodeStream = new ReadableWebToNodeStream(response.body); + const stream = byline(nodeStream); + + stream.on("data", (line) => { + try { + const data: IKubeWatchEvent = JSON.parse(line); + + console.log("data", data); + + if (callback) { + callback(data); + } + } catch (ignore) { + // ignore parse errors + } + }); + + stream.on("close", () => { + setTimeout(() => { + if (!disposed) this.watch({namespace, callback}); + }, 1000); + }); + + stream.on("error", (error) => { + console.error("stream error", error); + }); + }, (error) => { + if (error instanceof DOMException) return; // AbortController rejects, we can ignore it + + console.error("watch rejected", error); + }).catch((error) => { + console.error("watch error", error); + }); + + const disposer = () => { + disposed = true; + abortController.abort(); + }; + + return disposer; + } + + protected generateMessage(event: IKubeWatchEvent): IKubeWatchMessage { + const message: IKubeWatchMessage = {}; + + switch (event.type) { + case "ADDED": + case "DELETED": + + case "MODIFIED": { + const data = event as IKubeWatchEvent; + + message.data = data; + + ensureObjectSelfLink(this, data.object); + + const { namespace, resourceVersion } = data.object.metadata; + + this.setResourceVersion(namespace, resourceVersion); + this.setResourceVersion("", resourceVersion); + + message.api = this; + message.namespace = namespace; + break; + } + } + + return message; } } diff --git a/src/renderer/api/kube-watch-api.ts b/src/renderer/api/kube-watch-api.ts index 577124289d..6ec4c03489 100644 --- a/src/renderer/api/kube-watch-api.ts +++ b/src/renderer/api/kube-watch-api.ts @@ -1,21 +1,21 @@ // Kubernetes watch-api client // API: https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams -import type { IKubeWatchEvent, IKubeWatchEventStreamEnd, IWatchRoutePayload } from "../../main/routes/watch-route"; import type { KubeObject } from "./kube-object"; import type { KubeObjectStore } from "../kube-object.store"; import type { ClusterContext } from "../components/context"; import plimit from "p-limit"; -import debounce from "lodash/debounce"; -import { comparer, computed, IReactionDisposer, observable, reaction, when } from "mobx"; -import { autobind, EventEmitter, noop } from "../utils"; -import { ensureObjectSelfLink, KubeApi, parseKubeApi } from "./kube-api"; +import { comparer, IReactionDisposer, observable, reaction, when } from "mobx"; +import { autobind, noop } from "../utils"; +import { KubeApi } from "./kube-api"; import { KubeJsonApiData, KubeJsonApiError } from "./kube-json-api"; -import { apiPrefix, isDebugging, isProduction } from "../../common/vars"; -import { apiManager } from "./api-manager"; +import { isDebugging, isProduction } from "../../common/vars"; -export { IKubeWatchEvent, IKubeWatchEventStreamEnd }; +export interface IKubeWatchEvent { + type: "ADDED" | "MODIFIED" | "DELETED"; + object?: T; +} export interface IKubeWatchMessage { namespace?: string; @@ -32,11 +32,6 @@ export interface IKubeWatchSubscribeStoreOptions { loadOnce?: boolean; // check store.isLoaded to skip loading if done already, default: false } -export interface IKubeWatchReconnectOptions { - reconnectAttempts: number; - timeout: number; -} - export interface IKubeWatchLog { message: string | string[] | Error; meta?: object; @@ -45,87 +40,24 @@ export interface IKubeWatchLog { @autobind() export class KubeWatchApi { - private requestId = 0; - private reader: ReadableStreamReader; - public onMessage = new EventEmitter<[IKubeWatchMessage]>(); - @observable context: ClusterContext = null; @observable subscribers = observable.map(); @observable isConnected = false; contextReady = when(() => Boolean(this.context)); - @computed get isActive(): boolean { - return this.apis.length > 0; - } - - @computed get apis(): string[] { - if (!this.context) { - return []; - } - - return Array.from(this.subscribers.keys()).map(api => { - if (!this.isAllowedApi(api)) { - return []; - } - - // TODO: optimize - check when all namespaces are selected and then request all in one - if (api.isNamespaced && !this.context.cluster.isGlobalWatchEnabled) { - return this.context.contextNamespaces.map(namespace => api.getWatchUrl(namespace)); - } - - return api.getWatchUrl(); - }).flat(); - } - constructor() { this.init(); } private async init() { await this.contextReady; - this.bindAutoConnect(); - } - - private bindAutoConnect() { - const connect = debounce(() => this.connect(), 1000); - - reaction(() => this.apis, connect, { - fireImmediately: true, - equals: comparer.structural, - }); - - window.addEventListener("online", () => this.connect()); - window.addEventListener("offline", () => this.disconnect()); - setInterval(() => this.connectionCheck(), 60000 * 5); // every 5m - } - - getSubscribersCount(api: KubeApi) { - return this.subscribers.get(api) || 0; } isAllowedApi(api: KubeApi): boolean { return Boolean(this.context?.cluster.isAllowedResource(api.kind)); } - subscribeApi(api: KubeApi | KubeApi[]): () => void { - const apis: KubeApi[] = [api].flat(); - - apis.forEach(api => { - if (!this.isAllowedApi(api)) return; // skip - this.subscribers.set(api, this.getSubscribersCount(api) + 1); - }); - - return () => { - apis.forEach(api => { - const count = this.getSubscribersCount(api) - 1; - - if (count <= 0) this.subscribers.delete(api); - else this.subscribers.set(api, count); - }); - }; - } - preloadStores(stores: KubeObjectStore[], opts: { namespaces?: string[], loadOnce?: boolean } = {}) { const limitRequests = plimit(1); // load stores one by one to allow quick skipping when fast clicking btw pages const preloading: Promise[] = []; @@ -146,7 +78,6 @@ export class KubeWatchApi { subscribeStores(stores: KubeObjectStore[], opts: IKubeWatchSubscribeStoreOptions = {}): () => void { const { preload = true, waitUntilLoaded = true, loadOnce = false, } = opts; - const apis = new Set(stores.map(store => store.getSubscribeApis()).flat()); const subscribingNamespaces = opts.namespaces ?? this.context?.allNamespaces ?? []; const unsubscribeList: Function[] = []; let isUnsubscribed = false; @@ -157,7 +88,10 @@ export class KubeWatchApi { const subscribe = () => { if (isUnsubscribed) return; - apis.forEach(api => unsubscribeList.push(this.subscribeApi(api))); + + stores.forEach((store) => { + unsubscribeList.push(store.subscribe()); + }); }; if (preloading) { @@ -191,180 +125,6 @@ export class KubeWatchApi { }; } - protected async connectionCheck() { - if (!this.isConnected) { - this.log({ message: "Offline: reconnecting.." }); - await this.connect(); - } - - this.log({ - message: `Connection check: ${this.isConnected ? "online" : "offline"}`, - meta: { connected: this.isConnected }, - }); - } - - protected async connect(apis = this.apis) { - this.disconnect(); // close active connections first - - if (!navigator.onLine || !apis.length) { - this.isConnected = false; - - return; - } - - this.log({ - message: "Connecting", - meta: { apis } - }); - - try { - const requestId = ++this.requestId; - const abortController = new AbortController(); - - const request = await fetch(`${apiPrefix}/watch`, { - method: "POST", - body: JSON.stringify({ apis } as IWatchRoutePayload), - signal: abortController.signal, - headers: { - "content-type": "application/json" - } - }); - - // request above is stale since new request-id has been issued - if (this.requestId !== requestId) { - abortController.abort(); - - return; - } - - let jsonBuffer = ""; - const stream = request.body.pipeThrough(new TextDecoderStream()); - const reader = stream.getReader(); - - this.isConnected = true; - this.reader = reader; - - while (true) { - const { done, value } = await reader.read(); - - if (done) break; // exit - - const events = (jsonBuffer + value).trim().split("\n"); - - jsonBuffer = this.processBuffer(events); - } - } catch (error) { - this.log({ message: error }); - } finally { - this.isConnected = false; - } - } - - protected disconnect() { - this.reader?.cancel(); - this.reader = null; - this.isConnected = false; - } - - // process received stream events, returns unprocessed buffer chunk if any - protected processBuffer(events: string[]): string { - for (const json of events) { - try { - const kubeEvent: IKubeWatchEvent = JSON.parse(json); - const message = this.getMessage(kubeEvent); - const { data, namespace } = message; - - // log all data events - if (data) { - this.log({ - message: `[${data.type}] ${data.object.kind} in ${namespace || "(cluster)"}`, - meta: data, - cssStyle: `color: ${[ - data.type === "ADDED" && "green", - data.type === "MODIFIED" && "darkgray", - data.type === "DELETED" && "red", - ].filter(Boolean)};`, - }); - } - - // skip updates from non-watching resources context - if (!namespace || this.context?.contextNamespaces.includes(namespace)) { - this.onMessage.emit(message); - } - } catch (error) { - return json; - } - } - - return ""; - } - - protected getMessage(event: IKubeWatchEvent): IKubeWatchMessage { - const message: IKubeWatchMessage = {}; - - switch (event.type) { - case "ADDED": - case "DELETED": - - case "MODIFIED": { - const data = event as IKubeWatchEvent; - const api = apiManager.getApiByKind(data.object.kind, data.object.apiVersion); - - message.data = data; - - if (api) { - ensureObjectSelfLink(api, data.object); - - const { namespace, resourceVersion } = data.object.metadata; - - api.setResourceVersion(namespace, resourceVersion); - api.setResourceVersion("", resourceVersion); - - message.api = api; - message.store = apiManager.getStore(api); - message.namespace = namespace; - } - break; - } - - case "ERROR": - message.error = event as IKubeWatchEvent; - break; - - case "STREAM_END": { - this.onServerStreamEnd(event as IKubeWatchEventStreamEnd, { - reconnectAttempts: 5, - timeout: 1000, - }); - break; - } - } - - return message; - } - - protected async onServerStreamEnd(event: IKubeWatchEventStreamEnd, opts?: IKubeWatchReconnectOptions) { - const { apiBase, namespace } = parseKubeApi(event.url); - const api = apiManager.getApi(apiBase); - - if (!api) return; - - try { - await api.refreshResourceVersion({ namespace }); - this.connect(); - } catch (error) { - this.log({ - message: new Error(`Failed to connect on single stream end: ${error}`), - meta: { event, error }, - }); - - if (this.isActive && opts?.reconnectAttempts > 0) { - opts.reconnectAttempts--; - setTimeout(() => this.onServerStreamEnd(event, opts), opts.timeout); // repeat event - } - } - } - protected log({ message, cssStyle = "", meta = {} }: IKubeWatchLog) { if (isProduction && !isDebugging) { return; diff --git a/src/renderer/components/+workloads-overview/overview-statuses.tsx b/src/renderer/components/+workloads-overview/overview-statuses.tsx index 082f24db20..6274a94f30 100644 --- a/src/renderer/components/+workloads-overview/overview-statuses.tsx +++ b/src/renderer/components/+workloads-overview/overview-statuses.tsx @@ -6,7 +6,6 @@ import { OverviewWorkloadStatus } from "./overview-workload-status"; import { Link } from "react-router-dom"; import { workloadURL, workloadStores } from "../+workloads"; import { namespaceStore } from "../+namespaces/namespace.store"; -import { PageFiltersList } from "../item-object-list/page-filters-list"; import { NamespaceSelectFilter } from "../+namespaces/namespace-select"; import { isAllowedResource, KubeResource } from "../../../common/rbac"; import { ResourceNames } from "../../../renderer/utils/rbac"; @@ -50,7 +49,6 @@ export class OverviewStatuses extends React.Component {
Overview
-
{workloads}
diff --git a/src/renderer/components/item-object-list/item-list-layout.tsx b/src/renderer/components/item-object-list/item-list-layout.tsx index 3dbb16956a..5eeb7b2f11 100644 --- a/src/renderer/components/item-object-list/item-list-layout.tsx +++ b/src/renderer/components/item-object-list/item-list-layout.tsx @@ -181,11 +181,7 @@ export class ItemListLayout extends React.Component { @computed get filters() { let { activeFilters } = pageFilters; - const { isClusterScoped, isSearchable, searchFilters } = this.props; - - if (isClusterScoped) { - activeFilters = activeFilters.filter(({ type }) => type !== FilterType.NAMESPACE); - } + const { isSearchable, searchFilters } = this.props; if (!(isSearchable && searchFilters)) { activeFilters = activeFilters.filter(({ type }) => type !== FilterType.SEARCH); @@ -341,8 +337,8 @@ export class ItemListLayout extends React.Component { } renderInfo() { - const { allItems, items, isReady, userSettings, filters } = this; - const allItemsCount = allItems.length; + const { items, isReady, userSettings, filters } = this; + const allItemsCount = this.props.store.getTotalCount(); const itemsCount = items.length; const isFiltered = isReady && filters.length > 0; diff --git a/src/renderer/components/item-object-list/page-filters.store.ts b/src/renderer/components/item-object-list/page-filters.store.ts index 57651e6a26..933a94c06b 100644 --- a/src/renderer/components/item-object-list/page-filters.store.ts +++ b/src/renderer/components/item-object-list/page-filters.store.ts @@ -1,6 +1,5 @@ import { computed, observable, reaction } from "mobx"; import { autobind } from "../../utils"; -import { namespaceStore } from "../+namespaces/namespace.store"; import { searchUrlParam } from "../input/search-input-url"; export enum FilterType { @@ -24,33 +23,6 @@ export class PageFiltersStore { constructor() { this.syncWithGlobalSearch(); - this.syncWithContextNamespace(); - } - - // todo: refactor - protected syncWithContextNamespace() { - const disposers = [ - reaction(() => this.getValues(FilterType.NAMESPACE), filteredNs => { - if (filteredNs.length !== namespaceStore.contextNamespaces.length) { - namespaceStore.setContext(filteredNs); - } - }), - namespaceStore.onContextChange(namespaces => { - const filteredNs = this.getValues(FilterType.NAMESPACE); - const isChanged = namespaces.length !== filteredNs.length; - - if (isChanged) { - this.filters.replace([ - ...this.filters.filter(({ type }) => type !== FilterType.NAMESPACE), - ...namespaces.map(ns => ({ type: FilterType.NAMESPACE, value: ns })), - ]); - } - }, { - fireImmediately: true - }) - ]; - - return () => disposers.forEach(dispose => dispose()); } protected syncWithGlobalSearch() { diff --git a/src/renderer/item.store.ts b/src/renderer/item.store.ts index ce2c5eac25..a9ac3179c9 100644 --- a/src/renderer/item.store.ts +++ b/src/renderer/item.store.ts @@ -26,6 +26,10 @@ export abstract class ItemStore { return this.items.toJS(); } + public getTotalCount(): number { + return this.items.length; + } + getByName(name: string, ...args: any[]): T; getByName(name: string): T { return this.items.find(item => item.getName() === name); diff --git a/src/renderer/kube-object.store.ts b/src/renderer/kube-object.store.ts index d1fd3f6aea..d90de26e67 100644 --- a/src/renderer/kube-object.store.ts +++ b/src/renderer/kube-object.store.ts @@ -3,7 +3,7 @@ import type { ClusterContext } from "./components/context"; import { action, computed, observable, reaction, when } from "mobx"; import { autobind } from "./utils"; import { KubeObject } from "./api/kube-object"; -import { IKubeWatchEvent, IKubeWatchMessage, kubeWatchApi } from "./api/kube-watch-api"; +import { IKubeWatchEvent } from "./api/kube-watch-api"; import { ItemStore } from "./item.store"; import { apiManager } from "./api/api-manager"; import { IKubeApiQueryParams, KubeApi, parseKubeApi } from "./api/kube-api"; @@ -21,6 +21,7 @@ export abstract class KubeObjectStore extends ItemSt abstract api: KubeApi; public readonly limit?: number; public readonly bufferSize: number = 50000; + private loadedNamespaces: string[] = []; contextReady = when(() => Boolean(this.context)); @@ -43,6 +44,10 @@ export abstract class KubeObjectStore extends ItemSt }); } + getTotalCount(): number { + return this.contextItems.length; + } + get query(): IKubeApiQueryParams { const { limit } = this; @@ -107,8 +112,12 @@ export abstract class KubeObjectStore extends ItemSt const isLoadingAll = this.context.allNamespaces.every(ns => namespaces.includes(ns)); if (isLoadingAll) { + this.loadedNamespaces = []; + return api.list({}, this.query); } else { + this.loadedNamespaces = namespaces; + return Promise // load resources per namespace .all(namespaces.map(namespace => api.list({ namespace }))) .then(items => items.flat()); @@ -248,11 +257,6 @@ export abstract class KubeObjectStore extends ItemSt protected eventsBuffer = observable.array>([], { deep: false }); protected bindWatchEventsUpdater(delay = 1000) { - kubeWatchApi.onMessage.addListener((evt: IKubeWatchMessage) => { - if (!this.isLoaded || evt.store !== this) return; - this.eventsBuffer.push(evt.data); - }); - reaction(() => this.eventsBuffer.length, this.updateFromEventsBuffer, { delay }); @@ -263,7 +267,29 @@ export abstract class KubeObjectStore extends ItemSt } subscribe(apis = this.getSubscribeApis()) { - return kubeWatchApi.subscribeApi(apis); + let disposers: {(): void}[] = []; + + const callback = (data: IKubeWatchEvent) => { + this.eventsBuffer.push(data); + }; + + if (this.context.cluster.isGlobalWatchEnabled) { + disposers = apis.map(api => api.watch({ + namespace: "", + callback: (data) => callback(data) + })); + } else { + apis.map(api => { + this.loadedNamespaces.forEach((namespace) => { + disposers.push(api.watch({ + namespace, + callback: (data) => callback(data) + })); + }); + }); + } + + return () => disposers.forEach(dispose => dispose()); } @action diff --git a/yarn.lock b/yarn.lock index 114e2f8b5a..cd3384eba5 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1079,6 +1079,13 @@ resolved "https://registry.yarnpkg.com/@types/boom/-/boom-7.3.0.tgz#33280c5552d4cfabc21b8b7e0f6d29292decd985" integrity sha512-PH7bfkt1nu4pnlxz+Ws+wwJJF1HE12W3ia+Iace2JT7q56DLH3hbyjOJyNHJYRxk3PkKaC36fHfHKyeG1rMgCA== +"@types/byline@^4.2.32": + version "4.2.32" + resolved "https://registry.yarnpkg.com/@types/byline/-/byline-4.2.32.tgz#9d35ec15968056118548412ee24c2c3026c997dc" + integrity sha512-qtlm/J6XOO9p+Ep/ZB5+mCFEDhzWDDHWU4a1eReN7lkPZXW9rkloq2jcAhvKKmlO5tL2GSvKROb+PTsNVhBiyQ== + dependencies: + "@types/node" "*" + "@types/caseless@*": version "0.12.2" resolved "https://registry.yarnpkg.com/@types/caseless/-/caseless-0.12.2.tgz#f65d3d6389e01eeb458bd54dc8f52b95a9463bc8" @@ -1608,6 +1615,14 @@ "@types/prop-types" "*" csstype "^3.0.2" +"@types/readable-stream@^2.3.9": + version "2.3.9" + resolved "https://registry.yarnpkg.com/@types/readable-stream/-/readable-stream-2.3.9.tgz#40a8349e6ace3afd2dd1b6d8e9b02945de4566a9" + integrity sha512-sqsgQqFT7HmQz/V5jH1O0fvQQnXAJO46Gg9LRO/JPfjmVmGUlcx831TZZO3Y3HtWhIkzf3kTsNT0Z0kzIhIvZw== + dependencies: + "@types/node" "*" + safe-buffer "*" + "@types/relateurl@*": version "0.2.28" resolved "https://registry.yarnpkg.com/@types/relateurl/-/relateurl-0.2.28.tgz#6bda7db8653fa62643f5ee69e9f69c11a392e3a6" @@ -11449,6 +11464,14 @@ readable-stream@~1.1.10: isarray "0.0.1" string_decoder "~0.10.x" +readable-web-to-node-stream@^3.0.1: + version "3.0.1" + resolved "https://registry.yarnpkg.com/readable-web-to-node-stream/-/readable-web-to-node-stream-3.0.1.tgz#3f619b1bc5dd73a4cfe5c5f9b4f6faba55dff845" + integrity sha512-4zDC6CvjUyusN7V0QLsXVB7pJCD9+vtrM9bYDRv6uBQ+SKfx36rp5AFNPRgh9auKRul/a1iFZJYXcCbwRL+SaA== + dependencies: + "@types/readable-stream" "^2.3.9" + readable-stream "^3.6.0" + readdir-scoped-modules@^1.0.0, readdir-scoped-modules@^1.1.0: version "1.1.0" resolved "https://registry.yarnpkg.com/readdir-scoped-modules/-/readdir-scoped-modules-1.1.0.tgz#8d45407b4f870a0dcaebc0e28670d18e74514309" @@ -11867,16 +11890,16 @@ rxjs@^6.5.2: dependencies: tslib "^1.9.0" +safe-buffer@*, safe-buffer@>=5.1.0, safe-buffer@^5.0.1, safe-buffer@^5.1.0, safe-buffer@^5.1.1, safe-buffer@^5.1.2, safe-buffer@^5.2.0, safe-buffer@~5.2.0: + version "5.2.1" + resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.2.1.tgz#1eaf9fa9bdb1fdd4ec75f58f9cdb4e6b7827eec6" + integrity sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ== + safe-buffer@5.1.2, safe-buffer@~5.1.0, safe-buffer@~5.1.1: version "5.1.2" resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.2.tgz#991ec69d296e0313747d59bdfd2b745c35f8828d" integrity sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g== -safe-buffer@>=5.1.0, safe-buffer@^5.0.1, safe-buffer@^5.1.0, safe-buffer@^5.1.1, safe-buffer@^5.1.2, safe-buffer@^5.2.0, safe-buffer@~5.2.0: - version "5.2.1" - resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.2.1.tgz#1eaf9fa9bdb1fdd4ec75f58f9cdb4e6b7827eec6" - integrity sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ== - safe-regex@^1.1.0: version "1.1.0" resolved "https://registry.yarnpkg.com/safe-regex/-/safe-regex-1.1.0.tgz#40a3669f3b077d1e943d44629e157dd48023bf2e"