From 6f7cb4d568cbad3a642e2e2b0000e5e3c04c02c9 Mon Sep 17 00:00:00 2001 From: Jari Kolehmainen Date: Fri, 5 Feb 2021 09:45:55 +0200 Subject: [PATCH] Revert "Watch-api streaming reworks (#1990)" This reverts commit 078f952b363df6492f37ff833185cd1c620e0902. Signed-off-by: Jari Kolehmainen --- src/common/utils/delay.ts | 6 - src/common/utils/index.ts | 1 - src/main/router.ts | 2 +- src/main/routes/watch-route.ts | 97 +--- src/renderer/api/api-manager.ts | 4 +- src/renderer/api/kube-api.ts | 14 +- src/renderer/api/kube-watch-api.ts | 449 ++++++------------ .../components/+cluster/cluster-overview.tsx | 44 +- .../components/+events/event.store.ts | 4 - .../+namespaces/namespace-select.tsx | 27 +- .../components/+namespaces/namespace.store.ts | 6 +- .../+network-services/service-details.tsx | 16 +- src/renderer/components/+nodes/nodes.store.ts | 5 - .../role-bindings.store.ts | 4 +- .../+user-management-roles/roles.store.ts | 4 +- .../+workloads-overview/overview.tsx | 60 ++- src/renderer/components/app.tsx | 86 ++-- .../item-object-list/item-list-layout.tsx | 60 ++- .../kube-object/kube-object-list-layout.tsx | 16 +- src/renderer/kube-object.store.ts | 29 +- 20 files changed, 405 insertions(+), 529 deletions(-) delete mode 100644 src/common/utils/delay.ts diff --git a/src/common/utils/delay.ts b/src/common/utils/delay.ts deleted file mode 100644 index 208e042759..0000000000 --- a/src/common/utils/delay.ts +++ /dev/null @@ -1,6 +0,0 @@ -// Create async delay for provided timeout in milliseconds - -export async function delay(timeoutMs = 1000) { - if (!timeoutMs) return; - await new Promise(resolve => setTimeout(resolve, timeoutMs)); -} diff --git a/src/common/utils/index.ts b/src/common/utils/index.ts index 942c675f0a..582135d7f0 100644 --- a/src/common/utils/index.ts +++ b/src/common/utils/index.ts @@ -7,7 +7,6 @@ export * from "./autobind"; export * from "./base64"; export * from "./camelCase"; export * from "./cloneJson"; -export * from "./delay"; export * from "./debouncePromise"; export * from "./defineGlobal"; export * from "./getRandId"; diff --git a/src/main/router.ts b/src/main/router.ts index 875bd319b5..b5dd8b99ac 100644 --- a/src/main/router.ts +++ b/src/main/router.ts @@ -147,7 +147,7 @@ export class Router { this.router.add({ method: "get", path: `${apiPrefix}/kubeconfig/service-account/{namespace}/{account}` }, kubeconfigRoute.routeServiceAccountRoute.bind(kubeconfigRoute)); // Watch API - this.router.add({ method: "post", path: `${apiPrefix}/watch` }, watchRoute.routeWatch.bind(watchRoute)); + this.router.add({ method: "get", path: `${apiPrefix}/watch` }, watchRoute.routeWatch.bind(watchRoute)); // Metrics API this.router.add({ method: "post", path: `${apiPrefix}/metrics` }, metricsRoute.routeMetrics.bind(metricsRoute)); diff --git a/src/main/routes/watch-route.ts b/src/main/routes/watch-route.ts index 2c86314908..eb9f007eae 100644 --- a/src/main/routes/watch-route.ts +++ b/src/main/routes/watch-route.ts @@ -1,29 +1,10 @@ -import type { KubeJsonApiData, KubeJsonApiError } from "../../renderer/api/kube-json-api"; - -import plimit from "p-limit"; -import { delay } from "../../common/utils"; import { LensApiRequest } from "../router"; import { LensApi } from "../lens-api"; -import { KubeConfig, Watch } from "@kubernetes/client-node"; +import { Watch, KubeConfig } from "@kubernetes/client-node"; import { ServerResponse } from "http"; import { Request } from "request"; import logger from "../logger"; -export interface IKubeWatchEvent { - type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR" | "STREAM_END"; - object?: T; -} - -export interface IKubeWatchEventStreamEnd extends IKubeWatchEvent { - type: "STREAM_END"; - url: string; - status: number; -} - -export interface IWatchRoutePayload { - apis: string[]; // kube-api url list for subscribing to watch events -} - class ApiWatcher { private apiUrl: string; private response: ServerResponse; @@ -43,7 +24,6 @@ class ApiWatcher { clearInterval(this.processor); } this.processor = setInterval(() => { - if (this.response.finished) return; const events = this.eventBuffer.splice(0); events.map(event => this.sendEvent(event)); @@ -53,9 +33,7 @@ class ApiWatcher { } public stop() { - if (!this.watchRequest) { - return; - } + if (!this.watchRequest) { return; } if (this.processor) { clearInterval(this.processor); @@ -64,14 +42,11 @@ class ApiWatcher { try { this.watchRequest.abort(); - - const event: IKubeWatchEventStreamEnd = { + this.sendEvent({ type: "STREAM_END", url: this.apiUrl, status: 410, - }; - - this.sendEvent(event); + }); logger.debug("watch aborted"); } catch (error) { logger.error(`Watch abort errored:${error}`); @@ -90,72 +65,50 @@ class ApiWatcher { this.watchRequest.abort(); } - private sendEvent(evt: IKubeWatchEvent) { - this.response.write(`${JSON.stringify(evt)}\n`); + private sendEvent(evt: any) { + // convert to "text/event-stream" format + this.response.write(`data: ${JSON.stringify(evt)}\n\n`); } } class WatchRoute extends LensApi { - private response: ServerResponse; - private setResponse(response: ServerResponse) { - // clean up previous connection and stop all corresponding watch-api requests - // otherwise it happens only by request timeout or something else.. - this.response?.destroy(); - this.response = response; - } + public async routeWatch(request: LensApiRequest) { + const { response, cluster} = request; + const apis: string[] = request.query.getAll("api"); + const watchers: ApiWatcher[] = []; - public async routeWatch(request: LensApiRequest) { - const { response, cluster, payload: { apis } = {} } = request; - - if (!apis?.length) { + if (!apis.length) { this.respondJson(response, { - message: "watch apis list is empty" + message: "Empty request. Query params 'api' are not provided.", + example: "?api=/api/v1/pods&api=/api/v1/nodes", }, 400); return; } - this.setResponse(response); - response.setHeader("Content-Type", "application/json"); + response.setHeader("Content-Type", "text/event-stream"); response.setHeader("Cache-Control", "no-cache"); response.setHeader("Connection", "keep-alive"); logger.debug(`watch using kubeconfig:${JSON.stringify(cluster.getProxyKubeconfig(), null, 2)}`); - // limit concurrent k8s requests to avoid possible ECONNRESET-error - const requests = plimit(5); - const watchers = new Map(); - let isWatchRequestEnded = false; - apis.forEach(apiUrl => { const watcher = new ApiWatcher(apiUrl, cluster.getProxyKubeconfig(), response); - watchers.set(apiUrl, watcher); - - requests(async () => { - if (isWatchRequestEnded) return; - await watcher.start(); - await delay(100); - }); - }); - - function onRequestEnd() { - if (isWatchRequestEnded) return; - isWatchRequestEnded = true; - requests.clearQueue(); - watchers.forEach(watcher => watcher.stop()); - watchers.clear(); - } - - request.raw.req.on("end", () => { - logger.info("Watch request end"); - onRequestEnd(); + watcher.start(); + watchers.push(watcher); }); request.raw.req.on("close", () => { - logger.info("Watch request close"); - onRequestEnd(); + logger.debug("Watch request closed"); + watchers.map(watcher => watcher.stop()); }); + + request.raw.req.on("end", () => { + logger.debug("Watch request ended"); + watchers.map(watcher => watcher.stop()); + }); + } } diff --git a/src/renderer/api/api-manager.ts b/src/renderer/api/api-manager.ts index 47500adf79..629a0f29c2 100644 --- a/src/renderer/api/api-manager.ts +++ b/src/renderer/api/api-manager.ts @@ -2,7 +2,7 @@ import type { KubeObjectStore } from "../kube-object.store"; import { action, observable } from "mobx"; import { autobind } from "../utils"; -import { KubeApi, parseKubeApi } from "./kube-api"; +import { KubeApi } from "./kube-api"; @autobind() export class ApiManager { @@ -11,7 +11,7 @@ export class ApiManager { getApi(pathOrCallback: string | ((api: KubeApi) => boolean)) { if (typeof pathOrCallback === "string") { - return this.apis.get(pathOrCallback) || this.apis.get(parseKubeApi(pathOrCallback).apiBase); + return this.apis.get(pathOrCallback) || this.apis.get(KubeApi.parseApi(pathOrCallback).apiBase); } return Array.from(this.apis.values()).find(pathOrCallback ?? (() => true)); diff --git a/src/renderer/api/kube-api.ts b/src/renderer/api/kube-api.ts index e62603b14f..8a3a2517c2 100644 --- a/src/renderer/api/kube-api.ts +++ b/src/renderer/api/kube-api.ts @@ -92,6 +92,14 @@ export function ensureObjectSelfLink(api: KubeApi, object: KubeJsonApiData) { } export class KubeApi { + static parseApi = parseKubeApi; + + static watchAll(...apis: KubeApi[]) { + const disposers = apis.map(api => api.watch()); + + return () => disposers.forEach(unwatch => unwatch()); + } + readonly kind: string; readonly apiBase: string; readonly apiPrefix: string; @@ -116,7 +124,7 @@ export class KubeApi { if (!options.apiBase) { options.apiBase = objectConstructor.apiBase; } - const { apiBase, apiPrefix, apiGroup, apiVersion, resource } = parseKubeApi(options.apiBase); + const { apiBase, apiPrefix, apiGroup, apiVersion, resource } = KubeApi.parseApi(options.apiBase); this.kind = kind; this.isNamespaced = isNamespaced; @@ -149,7 +157,7 @@ export class KubeApi { for (const apiUrl of apiBases) { // Split e.g. "/apis/extensions/v1beta1/ingresses" to parts - const { apiPrefix, apiGroup, apiVersionWithGroup, resource } = parseKubeApi(apiUrl); + const { apiPrefix, apiGroup, apiVersionWithGroup, resource } = KubeApi.parseApi(apiUrl); // Request available resources try { @@ -358,7 +366,7 @@ export class KubeApi { } watch(): () => void { - return kubeWatchApi.subscribeApi(this); + return kubeWatchApi.subscribe(this); } } diff --git a/src/renderer/api/kube-watch-api.ts b/src/renderer/api/kube-watch-api.ts index 8adf58676f..fe35a04baa 100644 --- a/src/renderer/api/kube-watch-api.ts +++ b/src/renderer/api/kube-watch-api.ts @@ -1,349 +1,202 @@ -// Kubernetes watch-api client -// API: https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams +// Kubernetes watch-api consumer -import type { Cluster } from "../../main/cluster"; -import type { IKubeWatchEvent, IKubeWatchEventStreamEnd, IWatchRoutePayload } from "../../main/routes/watch-route"; -import type { KubeObject } from "./kube-object"; -import type { KubeObjectStore } from "../kube-object.store"; -import type { NamespaceStore } from "../components/+namespaces/namespace.store"; - -import plimit from "p-limit"; -import debounce from "lodash/debounce"; -import { comparer, computed, observable, reaction } from "mobx"; +import { computed, observable, reaction } from "mobx"; +import { stringify } from "querystring"; import { autobind, EventEmitter } from "../utils"; -import { ensureObjectSelfLink, KubeApi, parseKubeApi } from "./kube-api"; -import { KubeJsonApiData, KubeJsonApiError } from "./kube-json-api"; -import { apiPrefix, isDebugging, isProduction } from "../../common/vars"; +import { KubeJsonApiData } from "./kube-json-api"; +import type { KubeObjectStore } from "../kube-object.store"; +import { ensureObjectSelfLink, KubeApi } from "./kube-api"; import { apiManager } from "./api-manager"; +import { apiPrefix, isDevelopment } from "../../common/vars"; +import { getHostedCluster } from "../../common/cluster-store"; -export { IKubeWatchEvent, IKubeWatchEventStreamEnd }; - -export interface IKubeWatchMessage { - data?: IKubeWatchEvent - error?: IKubeWatchEvent; - api?: KubeApi; - store?: KubeObjectStore; +export interface IKubeWatchEvent { + type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR"; + object?: T; } -export interface IKubeWatchSubscribeStoreOptions { - preload?: boolean; // preload store items, default: true - waitUntilLoaded?: boolean; // subscribe only after loading all stores, default: true - cacheLoading?: boolean; // when enabled loading store will be skipped, default: false +export interface IKubeWatchRouteEvent { + type: "STREAM_END"; + url: string; + status: number; } -export interface IKubeWatchReconnectOptions { - reconnectAttempts: number; - timeout: number; -} - -export interface IKubeWatchLog { - message: string | Error; - meta?: object; +export interface IKubeWatchRouteQuery { + api: string | string[]; } @autobind() export class KubeWatchApi { - private cluster: Cluster; - private namespaceStore: NamespaceStore; - - private requestId = 0; - private isConnected = false; - private reader: ReadableStreamReader; - private subscribers = observable.map(); - - // events - public onMessage = new EventEmitter<[IKubeWatchMessage]>(); - - @computed get isActive(): boolean { - return this.apis.length > 0; - } - - @computed get apis(): string[] { - const { cluster, namespaceStore } = this; - const activeApis = Array.from(this.subscribers.keys()); - - return activeApis.map(api => { - if (!cluster.isAllowedResource(api.kind)) { - return []; - } - - if (api.isNamespaced) { - return namespaceStore.getContextNamespaces().map(namespace => api.getWatchUrl(namespace)); - } else { - return api.getWatchUrl(); - } - }).flat(); - } + protected evtSource: EventSource; + protected onData = new EventEmitter<[IKubeWatchEvent]>(); + protected subscribers = observable.map(); + protected reconnectTimeoutMs = 5000; + protected maxReconnectsOnError = 10; + protected reconnectAttempts = this.maxReconnectsOnError; constructor() { - this.init(); - } - - private async init() { - const { getHostedCluster } = await import("../../common/cluster-store"); - const { namespaceStore } = await import("../components/+namespaces/namespace.store"); - - await namespaceStore.whenReady; - - this.cluster = getHostedCluster(); - this.namespaceStore = namespaceStore; - this.bindAutoConnect(); - } - - private bindAutoConnect() { - const connect = debounce(() => this.connect(), 1000); - - reaction(() => this.apis, connect, { + reaction(() => this.activeApis, () => this.connect(), { fireImmediately: true, - equals: comparer.structural, + delay: 500, }); + } - window.addEventListener("online", () => this.connect()); - window.addEventListener("offline", () => this.disconnect()); - setInterval(() => this.connectionCheck(), 60000 * 5); // every 5m + @computed get activeApis() { + return Array.from(this.subscribers.keys()); } getSubscribersCount(api: KubeApi) { return this.subscribers.get(api) || 0; } - isAllowedApi(api: KubeApi): boolean { - return !!this?.cluster.isAllowedResource(api.kind); - } - - subscribeApi(api: KubeApi | KubeApi[]): () => void { - const apis: KubeApi[] = [api].flat(); - + subscribe(...apis: KubeApi[]) { apis.forEach(api => { - if (!this.isAllowedApi(api)) return; // skip this.subscribers.set(api, this.getSubscribersCount(api) + 1); }); - return () => { - apis.forEach(api => { - const count = this.getSubscribersCount(api) - 1; + return () => apis.forEach(api => { + const count = this.getSubscribersCount(api) - 1; - if (count <= 0) this.subscribers.delete(api); - else this.subscribers.set(api, count); - }); - }; - } - - subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): () => void { - const { preload = true, waitUntilLoaded = true, cacheLoading = false } = options; - const limitRequests = plimit(1); // load stores one by one to allow quick skipping when fast clicking btw pages - const preloading: Promise[] = []; - const apis = new Set(stores.map(store => store.getSubscribeApis()).flat()); - const unsubscribeList: (() => void)[] = []; - let isUnsubscribed = false; - - const subscribe = () => { - if (isUnsubscribed) return; - apis.forEach(api => unsubscribeList.push(this.subscribeApi(api))); - }; - - if (preload) { - for (const store of stores) { - preloading.push(limitRequests(async () => { - if (cacheLoading && store.isLoaded) return; // skip - - return store.loadAll(); - })); - } - } - - if (waitUntilLoaded) { - Promise.all(preloading).then(subscribe, error => { - this.log({ - message: new Error("Loading stores has failed"), - meta: { stores, error, options }, - }); - }); - } else { - subscribe(); - } - - // unsubscribe - return () => { - if (isUnsubscribed) return; - isUnsubscribed = true; - limitRequests.clearQueue(); - unsubscribeList.forEach(unsubscribe => unsubscribe()); - }; - } - - protected async connectionCheck() { - if (!this.isConnected) { - this.log({ message: "Offline: reconnecting.." }); - await this.connect(); - } - - this.log({ - message: `Connection check: ${this.isConnected ? "online" : "offline"}`, - meta: { connected: this.isConnected }, + if (count <= 0) this.subscribers.delete(api); + else this.subscribers.set(api, count); }); } - protected async connect(apis = this.apis) { - this.disconnect(); // close active connections first + // FIXME: use POST to send apis for subscribing (list could be huge) + // TODO: try to use normal fetch res.body stream to consume watch-api updates + // https://github.com/lensapp/lens/issues/1898 + protected async getQuery() { + const { namespaceStore } = await import("../components/+namespaces/namespace.store"); - if (!navigator.onLine || !apis.length) { - this.isConnected = false; + await namespaceStore.whenReady; + const { isAdmin } = getHostedCluster(); + return { + api: this.activeApis.map(api => { + if (isAdmin && !api.isNamespaced) { + return api.getWatchUrl(); + } + + if (api.isNamespaced) { + return namespaceStore.getContextNamespaces().map(namespace => api.getWatchUrl(namespace)); + } + + return []; + }).flat() + }; + } + + // todo: maybe switch to websocket to avoid often reconnects + @autobind() + protected async connect() { + if (this.evtSource) this.disconnect(); // close previous connection + + const query = await this.getQuery(); + + if (!this.activeApis.length || !query.api.length) { return; } - this.log({ - message: "Connecting", - meta: { apis } - }); + const apiUrl = `${apiPrefix}/watch?${stringify(query)}`; - try { - const requestId = ++this.requestId; - const abortController = new AbortController(); + this.evtSource = new EventSource(apiUrl); + this.evtSource.onmessage = this.onMessage; + this.evtSource.onerror = this.onError; + this.writeLog("CONNECTING", query.api); + } - const request = await fetch(`${apiPrefix}/watch`, { - method: "POST", - body: JSON.stringify({ apis } as IWatchRoutePayload), - signal: abortController.signal, - headers: { - "content-type": "application/json" - } - }); - - // request above is stale since new request-id has been issued - if (this.requestId !== requestId) { - abortController.abort(); - - return; - } - - let jsonBuffer = ""; - const stream = request.body.pipeThrough(new TextDecoderStream()); - const reader = stream.getReader(); - - this.isConnected = true; - this.reader = reader; - - while (true) { - const { done, value } = await reader.read(); - - if (done) break; // exit - - const events = (jsonBuffer + value).split("\n"); - - jsonBuffer = this.processBuffer(events); - } - } catch (error) { - this.log({ message: error }); - } finally { - this.isConnected = false; + reconnect() { + if (!this.evtSource || this.evtSource.readyState !== EventSource.OPEN) { + this.reconnectAttempts = this.maxReconnectsOnError; + this.connect(); } } protected disconnect() { - this.reader?.cancel(); - this.reader = null; - this.isConnected = false; + if (!this.evtSource) return; + this.evtSource.close(); + this.evtSource.onmessage = null; + this.evtSource = null; } - // process received stream events, returns unprocessed buffer chunk if any - protected processBuffer(events: string[]): string { - for (const json of events) { - try { - const kubeEvent: IKubeWatchEvent = JSON.parse(json); - const message = this.getMessage(kubeEvent); + protected onMessage(evt: MessageEvent) { + if (!evt.data) return; + const data = JSON.parse(evt.data); - this.onMessage.emit(message); - } catch (error) { - return json; - } - } - - return ""; - } - - protected getMessage(event: IKubeWatchEvent): IKubeWatchMessage { - const message: IKubeWatchMessage = {}; - - switch (event.type) { - case "ADDED": - case "DELETED": - - case "MODIFIED": { - const data = event as IKubeWatchEvent; - const api = apiManager.getApiByKind(data.object.kind, data.object.apiVersion); - - message.data = data; - - if (api) { - ensureObjectSelfLink(api, data.object); - - const { namespace, resourceVersion } = data.object.metadata; - - api.setResourceVersion(namespace, resourceVersion); - api.setResourceVersion("", resourceVersion); - - message.api = api; - message.store = apiManager.getStore(api); - } - break; - } - - case "ERROR": - message.error = event as IKubeWatchEvent; - break; - - case "STREAM_END": { - this.onServerStreamEnd(event as IKubeWatchEventStreamEnd, { - reconnectAttempts: 5, - timeout: 1000, - }); - break; - } - } - - return message; - } - - protected async onServerStreamEnd(event: IKubeWatchEventStreamEnd, opts?: IKubeWatchReconnectOptions) { - const { apiBase, namespace } = parseKubeApi(event.url); - const api = apiManager.getApi(apiBase); - - if (!api) return; - - try { - await api.refreshResourceVersion({ namespace }); - this.connect(); - } catch (error) { - this.log({ - message: new Error(`Failed to connect on single stream end: ${error}`), - meta: { event, error }, - }); - - if (this.isActive && opts?.reconnectAttempts > 0) { - opts.reconnectAttempts--; - setTimeout(() => this.onServerStreamEnd(event, opts), opts.timeout); // repeat event - } - } - } - - protected log({ message, meta = {} }: IKubeWatchLog) { - if (isProduction && !isDebugging) { - return; - } - - const logMessage = `%c[KUBE-WATCH-API]: ${String(message).toUpperCase()}`; - const isError = message instanceof Error; - const textStyle = `font-weight: bold;`; - const time = new Date().toLocaleString(); - - if (isError) { - console.error(logMessage, textStyle, { time, ...meta }); + if ((data as IKubeWatchEvent).object) { + this.onData.emit(data); } else { - console.info(logMessage, textStyle, { time, ...meta }); + this.onRouteEvent(data); } } + + protected async onRouteEvent(event: IKubeWatchRouteEvent) { + if (event.type === "STREAM_END") { + this.disconnect(); + const { apiBase, namespace } = KubeApi.parseApi(event.url); + const api = apiManager.getApi(apiBase); + + if (api) { + try { + await api.refreshResourceVersion({ namespace }); + this.reconnect(); + } catch (error) { + console.error("failed to refresh resource version", error); + + if (this.subscribers.size > 0) { + setTimeout(() => { + this.onRouteEvent(event); + }, 1000); + } + } + } + } + } + + protected onError(evt: MessageEvent) { + const { reconnectAttempts: attemptsRemain, reconnectTimeoutMs } = this; + + if (evt.eventPhase === EventSource.CLOSED) { + if (attemptsRemain > 0) { + this.reconnectAttempts--; + setTimeout(() => this.connect(), reconnectTimeoutMs); + } + } + } + + protected writeLog(...data: any[]) { + if (isDevelopment) { + console.log("%cKUBE-WATCH-API:", `font-weight: bold`, ...data); + } + } + + addListener(store: KubeObjectStore, callback: (evt: IKubeWatchEvent) => void) { + const listener = (evt: IKubeWatchEvent) => { + if (evt.type === "ERROR") { + return; // e.g. evt.object.message == "too old resource version" + } + + const { namespace, resourceVersion } = evt.object.metadata; + const api = apiManager.getApiByKind(evt.object.kind, evt.object.apiVersion); + + api.setResourceVersion(namespace, resourceVersion); + api.setResourceVersion("", resourceVersion); + + ensureObjectSelfLink(api, evt.object); + + if (store == apiManager.getStore(api)) { + callback(evt); + } + }; + + this.onData.addListener(listener); + + return () => this.onData.removeListener(listener); + } + + reset() { + this.subscribers.clear(); + } } export const kubeWatchApi = new KubeWatchApi(); diff --git a/src/renderer/components/+cluster/cluster-overview.tsx b/src/renderer/components/+cluster/cluster-overview.tsx index 1f7a7f6b78..104c6fd022 100644 --- a/src/renderer/components/+cluster/cluster-overview.tsx +++ b/src/renderer/components/+cluster/cluster-overview.tsx @@ -3,9 +3,13 @@ import "./cluster-overview.scss"; import React from "react"; import { reaction } from "mobx"; import { disposeOnUnmount, observer } from "mobx-react"; + +import { eventStore } from "../+events/event.store"; import { nodesStore } from "../+nodes/nodes.store"; import { podsStore } from "../+workloads-pods/pods.store"; import { getHostedCluster } from "../../../common/cluster-store"; +import { isAllowedResource } from "../../../common/rbac"; +import { KubeObjectStore } from "../../kube-object.store"; import { interval } from "../../utils"; import { TabLayout } from "../layout/tab-layout"; import { Spinner } from "../spinner"; @@ -13,33 +17,45 @@ import { ClusterIssues } from "./cluster-issues"; import { ClusterMetrics } from "./cluster-metrics"; import { clusterOverviewStore } from "./cluster-overview.store"; import { ClusterPieCharts } from "./cluster-pie-charts"; -import { eventStore } from "../+events/event.store"; -import { kubeWatchApi } from "../../api/kube-watch-api"; @observer export class ClusterOverview extends React.Component { - private metricPoller = interval(60, () => this.loadMetrics()); + private stores: KubeObjectStore[] = []; + private subscribers: Array<() => void> = []; + private metricPoller = interval(60, this.loadMetrics); + + @disposeOnUnmount + fetchMetrics = reaction( + () => clusterOverviewStore.metricNodeRole, // Toggle Master/Worker node switcher + () => this.metricPoller.restart(true) + ); loadMetrics() { getHostedCluster().available && clusterOverviewStore.loadMetrics(); } - componentDidMount() { - this.metricPoller.start(true); + async componentDidMount() { + if (isAllowedResource("nodes")) { + this.stores.push(nodesStore); + } - disposeOnUnmount(this, [ - kubeWatchApi.subscribeStores([nodesStore, podsStore, eventStore], { - preload: true, - }), + if (isAllowedResource("pods")) { + this.stores.push(podsStore); + } - reaction( - () => clusterOverviewStore.metricNodeRole, // Toggle Master/Worker node switcher - () => this.metricPoller.restart(true) - ), - ]); + if (isAllowedResource("events")) { + this.stores.push(eventStore); + } + + await Promise.all(this.stores.map(store => store.loadAll())); + this.loadMetrics(); + + this.subscribers = this.stores.map(store => store.subscribe()); + this.metricPoller.start(); } componentWillUnmount() { + this.subscribers.forEach(dispose => dispose()); // unsubscribe all this.metricPoller.stop(); } diff --git a/src/renderer/components/+events/event.store.ts b/src/renderer/components/+events/event.store.ts index d6090be947..3651ce1549 100644 --- a/src/renderer/components/+events/event.store.ts +++ b/src/renderer/components/+events/event.store.ts @@ -52,10 +52,6 @@ export class EventStore extends KubeObjectStore { return compact(eventsWithError); } - - getWarningsCount() { - return this.getWarnings().length; - } } export const eventStore = new EventStore(); diff --git a/src/renderer/components/+namespaces/namespace-select.tsx b/src/renderer/components/+namespaces/namespace-select.tsx index 6ee7ea2d57..079a9cd0b6 100644 --- a/src/renderer/components/+namespaces/namespace-select.tsx +++ b/src/renderer/components/+namespaces/namespace-select.tsx @@ -2,14 +2,13 @@ import "./namespace-select.scss"; import React from "react"; import { computed } from "mobx"; -import { disposeOnUnmount, observer } from "mobx-react"; +import { observer } from "mobx-react"; import { Select, SelectOption, SelectProps } from "../select"; -import { cssNames } from "../../utils"; +import { cssNames, noop } from "../../utils"; import { Icon } from "../icon"; import { namespaceStore } from "./namespace.store"; import { FilterIcon } from "../item-object-list/filter-icon"; import { FilterType } from "../item-object-list/page-filters.store"; -import { kubeWatchApi } from "../../api/kube-watch-api"; interface Props extends SelectProps { showIcons?: boolean; @@ -29,13 +28,17 @@ const defaultProps: Partial = { @observer export class NamespaceSelect extends React.Component { static defaultProps = defaultProps as object; + private unsubscribe = noop; - componentDidMount() { - disposeOnUnmount(this, [ - kubeWatchApi.subscribeStores([namespaceStore], { - preload: true, - }) - ]); + async componentDidMount() { + if (!namespaceStore.isLoaded) { + await namespaceStore.loadAll(); + } + this.unsubscribe = namespaceStore.subscribe(); + } + + componentWillUnmount() { + this.unsubscribe(); } @computed get options(): SelectOption[] { @@ -57,7 +60,7 @@ export class NamespaceSelect extends React.Component { return label || ( <> - {showIcons && } + {showIcons && } {value} ); @@ -100,9 +103,9 @@ export class NamespaceSelectFilter extends React.Component { return (
- + {namespace} - {isSelected && } + {isSelected && }
); }} diff --git a/src/renderer/components/+namespaces/namespace.store.ts b/src/renderer/components/+namespaces/namespace.store.ts index 63bb7525de..50ec2c8038 100644 --- a/src/renderer/components/+namespaces/namespace.store.ts +++ b/src/renderer/components/+namespaces/namespace.store.ts @@ -117,15 +117,15 @@ export class NamespaceStore extends KubeObjectStore { return namespaces; } - getSubscribeApis() { + subscribe(apis = [this.api]) { const { accessibleNamespaces } = getHostedCluster(); // if user has given static list of namespaces let's not start watches because watch adds stuff that's not wanted if (accessibleNamespaces.length > 0) { - return []; + return Function; // no-op } - return super.getSubscribeApis(); + return super.subscribe(apis); } protected async loadItems(params: KubeObjectStoreLoadingParams) { diff --git a/src/renderer/components/+network-services/service-details.tsx b/src/renderer/components/+network-services/service-details.tsx index 80c9bd4dc7..58cbe0a86e 100644 --- a/src/renderer/components/+network-services/service-details.tsx +++ b/src/renderer/components/+network-services/service-details.tsx @@ -1,18 +1,17 @@ import "./service-details.scss"; import React from "react"; -import { disposeOnUnmount, observer } from "mobx-react"; +import { observer } from "mobx-react"; import { DrawerItem, DrawerTitle } from "../drawer"; import { Badge } from "../badge"; import { KubeEventDetails } from "../+events/kube-event-details"; import { KubeObjectDetailsProps } from "../kube-object"; -import { Service } from "../../api/endpoints"; +import { Service, endpointApi } from "../../api/endpoints"; import { KubeObjectMeta } from "../kube-object/kube-object-meta"; import { ServicePortComponent } from "./service-port-component"; import { endpointStore } from "../+network-endpoints/endpoints.store"; import { ServiceDetailsEndpoint } from "./service-details-endpoint"; import { kubeObjectDetailRegistry } from "../../api/kube-object-detail-registry"; -import { kubeWatchApi } from "../../api/kube-watch-api"; interface Props extends KubeObjectDetailsProps { } @@ -20,11 +19,10 @@ interface Props extends KubeObjectDetailsProps { @observer export class ServiceDetails extends React.Component { componentDidMount() { - disposeOnUnmount(this, [ - kubeWatchApi.subscribeStores([endpointStore], { - preload: true, - }), - ]); + if (!endpointStore.isLoaded) { + endpointStore.loadAll(); + } + endpointApi.watch(); } render() { @@ -79,7 +77,7 @@ export class ServiceDetails extends React.Component { )} - + ); } diff --git a/src/renderer/components/+nodes/nodes.store.ts b/src/renderer/components/+nodes/nodes.store.ts index b301015747..c0385b078b 100644 --- a/src/renderer/components/+nodes/nodes.store.ts +++ b/src/renderer/components/+nodes/nodes.store.ts @@ -1,4 +1,3 @@ -import { sum } from "lodash"; import { action, computed, observable } from "mobx"; import { clusterApi, IClusterMetrics, INodeMetrics, Node, nodesApi } from "../../api/endpoints"; import { autobind } from "../../utils"; @@ -63,10 +62,6 @@ export class NodesStore extends KubeObjectStore { }); } - getWarningsCount(): number { - return sum(this.items.map((node: Node) => node.getWarningConditions().length)); - } - reset() { super.reset(); this.metrics = {}; diff --git a/src/renderer/components/+user-management-roles-bindings/role-bindings.store.ts b/src/renderer/components/+user-management-roles-bindings/role-bindings.store.ts index 620fbd86ac..71890acc44 100644 --- a/src/renderer/components/+user-management-roles-bindings/role-bindings.store.ts +++ b/src/renderer/components/+user-management-roles-bindings/role-bindings.store.ts @@ -9,8 +9,8 @@ import { apiManager } from "../../api/api-manager"; export class RoleBindingsStore extends KubeObjectStore { api = clusterRoleBindingApi; - getSubscribeApis() { - return [clusterRoleBindingApi, roleBindingApi]; + subscribe() { + return super.subscribe([clusterRoleBindingApi, roleBindingApi]); } protected sortItems(items: RoleBinding[]) { diff --git a/src/renderer/components/+user-management-roles/roles.store.ts b/src/renderer/components/+user-management-roles/roles.store.ts index 82b0e66612..7d2e90dd38 100644 --- a/src/renderer/components/+user-management-roles/roles.store.ts +++ b/src/renderer/components/+user-management-roles/roles.store.ts @@ -7,8 +7,8 @@ import { apiManager } from "../../api/api-manager"; export class RolesStore extends KubeObjectStore { api = clusterRoleApi; - getSubscribeApis() { - return [roleApi, clusterRoleApi]; + subscribe() { + return super.subscribe([roleApi, clusterRoleApi]); } protected sortItems(items: Role[]) { diff --git a/src/renderer/components/+workloads-overview/overview.tsx b/src/renderer/components/+workloads-overview/overview.tsx index 50a25ef87c..351b57462c 100644 --- a/src/renderer/components/+workloads-overview/overview.tsx +++ b/src/renderer/components/+workloads-overview/overview.tsx @@ -1,7 +1,8 @@ import "./overview.scss"; import React from "react"; -import { disposeOnUnmount, observer } from "mobx-react"; +import { observable, when } from "mobx"; +import { observer } from "mobx-react"; import { OverviewStatuses } from "./overview-statuses"; import { RouteComponentProps } from "react-router"; import { IWorkloadsOverviewRouteParams } from "../+workloads"; @@ -14,23 +15,60 @@ import { replicaSetStore } from "../+workloads-replicasets/replicasets.store"; import { jobStore } from "../+workloads-jobs/job.store"; import { cronJobStore } from "../+workloads-cronjobs/cronjob.store"; import { Events } from "../+events"; +import { KubeObjectStore } from "../../kube-object.store"; import { isAllowedResource } from "../../../common/rbac"; -import { kubeWatchApi } from "../../api/kube-watch-api"; +import { namespaceStore } from "../+namespaces/namespace.store"; interface Props extends RouteComponentProps { } @observer export class WorkloadsOverview extends React.Component { - componentDidMount() { - disposeOnUnmount(this, [ - kubeWatchApi.subscribeStores([ - podsStore, deploymentStore, daemonSetStore, statefulSetStore, replicaSetStore, - jobStore, cronJobStore, eventStore, - ], { - preload: true, - }), - ]); + @observable isLoading = false; + @observable isUnmounting = false; + + async componentDidMount() { + const stores: KubeObjectStore[] = [ + isAllowedResource("pods") && podsStore, + isAllowedResource("deployments") && deploymentStore, + isAllowedResource("daemonsets") && daemonSetStore, + isAllowedResource("statefulsets") && statefulSetStore, + isAllowedResource("replicasets") && replicaSetStore, + isAllowedResource("jobs") && jobStore, + isAllowedResource("cronjobs") && cronJobStore, + isAllowedResource("events") && eventStore, + ].filter(Boolean); + + const unsubscribeMap = new Map void>(); + + const loadStores = async () => { + this.isLoading = true; + + for (const store of stores) { + if (this.isUnmounting) break; + + try { + await store.loadAll(); + unsubscribeMap.get(store)?.(); // unsubscribe previous watcher + unsubscribeMap.set(store, store.subscribe()); + } catch (error) { + console.error("loading store error", error); + } + } + this.isLoading = false; + }; + + namespaceStore.onContextChange(loadStores, { + fireImmediately: true, + }); + + await when(() => this.isUnmounting && !this.isLoading); + unsubscribeMap.forEach(dispose => dispose()); + unsubscribeMap.clear(); + } + + componentWillUnmount() { + this.isUnmounting = true; } render() { diff --git a/src/renderer/components/app.tsx b/src/renderer/components/app.tsx index 2b36bf0057..23d4c807ed 100755 --- a/src/renderer/components/app.tsx +++ b/src/renderer/components/app.tsx @@ -1,5 +1,5 @@ import React from "react"; -import { disposeOnUnmount, observer } from "mobx-react"; +import { observer } from "mobx-react"; import { Redirect, Route, Router, Switch } from "react-router"; import { history } from "../navigation"; import { Notifications } from "./notifications"; @@ -42,10 +42,10 @@ import { ClusterPageMenuRegistration, clusterPageMenuRegistry } from "../../exte import { TabLayout, TabLayoutRoute } from "./layout/tab-layout"; import { StatefulSetScaleDialog } from "./+workloads-statefulsets/statefulset-scale-dialog"; import { eventStore } from "./+events/event.store"; -import { computed, reaction, observable } from "mobx"; +import { reaction, computed, observable } from "mobx"; import { nodesStore } from "./+nodes/nodes.store"; import { podsStore } from "./+workloads-pods/pods.store"; -import { kubeWatchApi } from "../api/kube-watch-api"; +import { sum } from "lodash"; import { ReplicaSetScaleDialog } from "./+workloads-replicasets/replicaset-scale-dialog"; import { CommandContainer } from "./command-palette/command-container"; @@ -76,26 +76,50 @@ export class App extends React.Component { whatInput.ask(); // Start to monitor user input device } - componentDidMount() { - disposeOnUnmount(this, [ - kubeWatchApi.subscribeStores([podsStore, nodesStore, eventStore], { - preload: true, - }), + @observable extensionRoutes: Map = new Map(); - reaction(() => this.warningsTotal, (count: number) => { - broadcastMessage(`cluster-warning-event-count:${getHostedCluster().id}`, count); - }), + async componentDidMount() { + const cluster = getHostedCluster(); + const promises: Promise[] = []; - reaction(() => clusterPageMenuRegistry.getRootItems(), (rootItems) => { - this.generateExtensionTabLayoutRoutes(rootItems); - }, { - fireImmediately: true - }) - ]); + if (isAllowedResource("events") && isAllowedResource("pods")) { + promises.push(eventStore.loadAll()); + promises.push(podsStore.loadAll()); + } + + if (isAllowedResource("nodes")) { + promises.push(nodesStore.loadAll()); + } + await Promise.all(promises); + + if (eventStore.isLoaded && podsStore.isLoaded) { + eventStore.subscribe(); + podsStore.subscribe(); + } + + if (nodesStore.isLoaded) { + nodesStore.subscribe(); + } + + reaction(() => this.warningsCount, (count) => { + broadcastMessage(`cluster-warning-event-count:${cluster.id}`, count); + }); + + reaction(() => clusterPageMenuRegistry.getRootItems(), (rootItems) => { + this.generateExtensionTabLayoutRoutes(rootItems); + }, { + fireImmediately: true + }); } - @computed get warningsTotal(): number { - return nodesStore.getWarningsCount() + eventStore.getWarningsCount(); + @computed + get warningsCount() { + let warnings = sum(nodesStore.items + .map(node => node.getWarningConditions().length)); + + warnings = warnings + eventStore.getWarnings().length; + + return warnings; } get startURL() { @@ -128,26 +152,6 @@ export class App extends React.Component { return routes; } - renderExtensionTabLayoutRoutes() { - return clusterPageMenuRegistry.getRootItems().map((menu, index) => { - const tabRoutes = this.getTabLayoutRoutes(menu); - - if (tabRoutes.length > 0) { - const pageComponent = () => ; - - return tab.routePath)}/>; - } else { - const page = clusterPageRegistry.getByPageTarget(menu.target); - - if (page) { - return ; - } - } - }); - } - - @observable extensionRoutes: Map = new Map(); - generateExtensionTabLayoutRoutes(rootItems: ClusterPageMenuRegistration[]) { rootItems.forEach((menu, index) => { let route = this.extensionRoutes.get(menu); @@ -178,6 +182,10 @@ export class App extends React.Component { } } + renderExtensionTabLayoutRoutes() { + return Array.from(this.extensionRoutes.values()); + } + renderExtensionRoutes() { return clusterPageRegistry.getItems().map((page, index) => { const menu = clusterPageMenuRegistry.getByPage(page); diff --git a/src/renderer/components/item-object-list/item-list-layout.tsx b/src/renderer/components/item-object-list/item-list-layout.tsx index b13d496064..6b4ff4fd16 100644 --- a/src/renderer/components/item-object-list/item-list-layout.tsx +++ b/src/renderer/components/item-object-list/item-list-layout.tsx @@ -2,7 +2,7 @@ import "./item-list-layout.scss"; import groupBy from "lodash/groupBy"; import React, { ReactNode } from "react"; -import { computed, observable, reaction, toJS } from "mobx"; +import { computed, IReactionDisposer, observable, reaction, toJS } from "mobx"; import { disposeOnUnmount, observer } from "mobx-react"; import { ConfirmDialog, ConfirmDialogParams } from "../confirm-dialog"; import { Table, TableCell, TableCellProps, TableHead, TableProps, TableRow, TableRowProps, TableSortCallback } from "../table"; @@ -12,6 +12,7 @@ import { NoItems } from "../no-items"; import { Spinner } from "../spinner"; import { ItemObject, ItemStore } from "../../item.store"; import { SearchInputUrl } from "../input"; +import { namespaceStore } from "../+namespaces/namespace.store"; import { Filter, FilterType, pageFilters } from "./page-filters.store"; import { PageFiltersList } from "./page-filters-list"; import { PageFiltersSelect } from "./page-filters-select"; @@ -21,7 +22,6 @@ import { MenuActions } from "../menu/menu-actions"; import { MenuItem } from "../menu"; import { Checkbox } from "../checkbox"; import { userStore } from "../../../common/user-store"; -import { namespaceStore } from "../+namespaces/namespace.store"; // todo: refactor, split to small re-usable components @@ -40,7 +40,6 @@ export interface ItemListLayoutProps { className: IClassName; store: ItemStore; dependentStores?: ItemStore[]; - preloadStores?: boolean; isClusterScoped?: boolean; hideFilters?: boolean; searchFilters?: SearchFilter[]; @@ -83,7 +82,6 @@ const defaultProps: Partial = { isSelectable: true, isConfigurable: false, copyClassNameFromHeadCells: true, - preloadStores: true, dependentStores: [], filterItems: [], hasDetailsView: true, @@ -99,6 +97,10 @@ interface ItemListLayoutUserSettings { export class ItemListLayout extends React.Component { static defaultProps = defaultProps as object; + private watchDisposers: IReactionDisposer[] = []; + + @observable isUnmounting = false; + @observable userSettings: ItemListLayoutUserSettings = { showAppliedFilters: false, }; @@ -117,28 +119,54 @@ export class ItemListLayout extends React.Component { } async componentDidMount() { - const { isClusterScoped, isConfigurable, tableId, preloadStores } = this.props; + const { isClusterScoped, isConfigurable, tableId } = this.props; if (isConfigurable && !tableId) { throw new Error("[ItemListLayout]: configurable list require props.tableId to be specified"); } - if (preloadStores) { - this.loadStores(); + this.loadStores(); - if (!isClusterScoped) { - disposeOnUnmount(this, [ - namespaceStore.onContextChange(() => this.loadStores()) - ]); + if (!isClusterScoped) { + disposeOnUnmount(this, [ + namespaceStore.onContextChange(() => this.loadStores()) + ]); + } + } + + async componentWillUnmount() { + this.isUnmounting = true; + this.unsubscribeStores(); + } + + @computed get stores() { + const { store, dependentStores } = this.props; + + return new Set([store, ...dependentStores]); + } + + async loadStores() { + this.unsubscribeStores(); // reset first + + // load + for (const store of this.stores) { + if (this.isUnmounting) { + this.unsubscribeStores(); + break; + } + + try { + await store.loadAll(); + this.watchDisposers.push(store.subscribe()); + } catch (error) { + console.error("loading store error", error); } } } - private loadStores() { - const { store, dependentStores } = this.props; - const stores = Array.from(new Set([store, ...dependentStores])); - - stores.forEach(store => store.loadAll()); + unsubscribeStores() { + this.watchDisposers.forEach(dispose => dispose()); + this.watchDisposers.length = 0; } private filterCallbacks: { [type: string]: ItemsFilter } = { diff --git a/src/renderer/components/kube-object/kube-object-list-layout.tsx b/src/renderer/components/kube-object/kube-object-list-layout.tsx index 226023fc8d..25922f0f72 100644 --- a/src/renderer/components/kube-object/kube-object-list-layout.tsx +++ b/src/renderer/components/kube-object/kube-object-list-layout.tsx @@ -1,17 +1,15 @@ import React from "react"; import { computed } from "mobx"; -import { disposeOnUnmount, observer } from "mobx-react"; +import { observer } from "mobx-react"; import { cssNames } from "../../utils"; import { KubeObject } from "../../api/kube-object"; import { ItemListLayout, ItemListLayoutProps } from "../item-object-list/item-list-layout"; import { KubeObjectStore } from "../../kube-object.store"; import { KubeObjectMenu } from "./kube-object-menu"; import { kubeSelectedUrlParam, showDetails } from "./kube-object-details"; -import { kubeWatchApi } from "../../api/kube-watch-api"; export interface KubeObjectListLayoutProps extends ItemListLayoutProps { store: KubeObjectStore; - dependentStores?: KubeObjectStore[]; } @observer @@ -20,17 +18,6 @@ export class KubeObjectListLayout extends React.Component { if (this.props.onDetails) { this.props.onDetails(item); @@ -46,7 +33,6 @@ export class KubeObjectListLayout extends React.Component { diff --git a/src/renderer/kube-object.store.ts b/src/renderer/kube-object.store.ts index 760ebd3335..8a75bc7ae6 100644 --- a/src/renderer/kube-object.store.ts +++ b/src/renderer/kube-object.store.ts @@ -2,10 +2,10 @@ import type { Cluster } from "../main/cluster"; import { action, observable, reaction } from "mobx"; import { autobind } from "./utils"; import { KubeObject } from "./api/kube-object"; -import { IKubeWatchEvent, IKubeWatchMessage, kubeWatchApi } from "./api/kube-watch-api"; +import { IKubeWatchEvent, kubeWatchApi } from "./api/kube-watch-api"; import { ItemStore } from "./item.store"; import { apiManager } from "./api/api-manager"; -import { IKubeApiQueryParams, KubeApi, parseKubeApi } from "./api/kube-api"; +import { IKubeApiQueryParams, KubeApi } from "./api/kube-api"; import { KubeJsonApiData } from "./api/kube-json-api"; export interface KubeObjectStoreLoadingParams { @@ -22,6 +22,7 @@ export abstract class KubeObjectStore extends ItemSt constructor() { super(); this.bindWatchEventsUpdater(); + kubeWatchApi.addListener(this, this.onWatchApiEvent); } get query(): IKubeApiQueryParams { @@ -156,7 +157,7 @@ export abstract class KubeObjectStore extends ItemSt @action async loadFromPath(resourcePath: string) { - const { namespace, name } = parseKubeApi(resourcePath); + const { namespace, name } = KubeApi.parseApi(resourcePath); return this.load({ name, namespace }); } @@ -194,29 +195,29 @@ export abstract class KubeObjectStore extends ItemSt } // collect items from watch-api events to avoid UI blowing up with huge streams of data - protected eventsBuffer = observable.array>([], { deep: false }); + protected eventsBuffer = observable>([], { deep: false }); protected bindWatchEventsUpdater(delay = 1000) { - kubeWatchApi.onMessage.addListener(({ store, data }: IKubeWatchMessage) => { - if (!this.isLoaded || store !== this) return; - this.eventsBuffer.push(data); - }); - - reaction(() => this.eventsBuffer.length > 0, this.updateFromEventsBuffer, { + return reaction(() => this.eventsBuffer.toJS()[0], this.updateFromEventsBuffer, { delay }); } - getSubscribeApis(): KubeApi[] { - return [this.api]; + subscribe(apis = [this.api]) { + return KubeApi.watchAll(...apis); } - subscribe(apis = this.getSubscribeApis()) { - return kubeWatchApi.subscribeApi(apis); + protected onWatchApiEvent(evt: IKubeWatchEvent) { + if (!this.isLoaded) return; + this.eventsBuffer.push(evt); } @action protected updateFromEventsBuffer() { + if (!this.eventsBuffer.length) { + return; + } + // create latest non-observable copy of items to apply updates in one action (==single render) const items = this.items.toJS(); for (const { type, object } of this.eventsBuffer.clear()) {