diff --git a/src/renderer/api/kube-watch-api.ts b/src/renderer/api/kube-watch-api.ts index b736616aa0..cae6e5431b 100644 --- a/src/renderer/api/kube-watch-api.ts +++ b/src/renderer/api/kube-watch-api.ts @@ -1,11 +1,14 @@ // Kubernetes watch-api client +// API: https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams import type { Cluster } from "../../main/cluster"; import type { IKubeWatchEvent, IKubeWatchEventStreamEnd, IWatchRoutePayload } from "../../main/routes/watch-route"; import type { KubeObject } from "./kube-object"; import type { KubeObjectStore } from "../kube-object.store"; +import type { NamespaceStore } from "../components/+namespaces/namespace.store"; -import { computed, observable, reaction } from "mobx"; +import debounce from "lodash/debounce"; +import { comparer, computed, observable, reaction } from "mobx"; import { autobind, EventEmitter } from "../utils"; import { ensureObjectSelfLink, KubeApi, parseKubeApi } from "./kube-api"; import { KubeJsonApiData, KubeJsonApiError } from "./kube-json-api"; @@ -33,29 +36,58 @@ export interface IKubeWatchLog { @autobind() export class KubeWatchApi { - protected stream: ReadableStream; // https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams - protected subscribers = observable.map(); - protected reconnectTimeoutMs = 5000; - protected maxReconnectsOnError = 10; - protected jsonBuffer = ""; - protected splitter = "\n"; + private cluster: Cluster; + private namespaceStore: NamespaceStore; + + private requestId = 0; + private reader: ReadableStreamReader; + private subscribers = observable.map(); + private splitter = "\n"; + private reconnectTimeoutMs = 5000; + private maxReconnectsOnError = 10; // events - onMessage = new EventEmitter<[IKubeWatchMessage]>(); + public onMessage = new EventEmitter<[IKubeWatchMessage]>(); constructor() { + this.init(); + } + + private async init() { + const { getHostedCluster } = await import("../../common/cluster-store"); + const { namespaceStore } = await import("../components/+namespaces/namespace.store"); + + await namespaceStore.whenReady; + + this.cluster = getHostedCluster(); + this.namespaceStore = namespaceStore; this.bindAutoConnect(); } private bindAutoConnect() { - return reaction(() => this.activeApis, () => this.connect(), { + const connect = debounce(() => this.connect(), 1000); + + return reaction(() => this.activeApis, connect, { fireImmediately: true, - delay: 500, + equals: comparer.structural, }); } - @computed get activeApis() { - return Array.from(this.subscribers.keys()); + @computed get activeApis(): string[] { + const { cluster, namespaceStore } = this; + const activeApis = Array.from(this.subscribers.keys()); + + return activeApis.map(api => { + if (!cluster.isAllowedResource(api.kind)) { + return []; + } + + if (api.isNamespaced) { + return namespaceStore.getContextNamespaces().map(namespace => api.getWatchUrl(namespace)); + } else { + return api.getWatchUrl(); + } + }).flat(); } getSubscribersCount(api: KubeApi) { @@ -116,97 +148,76 @@ export class KubeWatchApi { return unsubscribe; } - protected async resolveCluster(): Promise { - const { getHostedCluster } = await import("../../common/cluster-store"); + protected async connect(apis = this.activeApis) { + this.disconnect(); // close active connections first - return getHostedCluster(); - } - - protected async getRequestPayload(): Promise { - const cluster = await this.resolveCluster(); - const { namespaceStore } = await import("../components/+namespaces/namespace.store"); - - await namespaceStore.whenReady; - - return { - apis: this.activeApis.map(api => { - if (!cluster.isAllowedResource(api.kind)) { - return []; - } - - if (api.isNamespaced) { - return namespaceStore.getContextNamespaces().map(namespace => api.getWatchUrl(namespace)); - } else { - return api.getWatchUrl(); - } - }).flat() - }; - } - - protected async connect() { - this.disconnect(); // close active connection first - - const payload = await this.getRequestPayload(); - - if (!payload.apis.length) { + if (!apis.length) { return; } this.log({ message: "Connecting", - meta: payload, + meta: { apis } }); try { - const req = await fetch(`${apiPrefix}/watch`, { + const requestId = ++this.requestId; + const abortController = new AbortController(); + + const request = await fetch(`${apiPrefix}/watch`, { method: "POST", - body: JSON.stringify(payload), - keepalive: true, + body: JSON.stringify({ apis } as IWatchRoutePayload), + signal: abortController.signal, headers: { "content-type": "application/json" } }); - this.stream = req.body.pipeThrough(new TextDecoderStream()); - this.stream.cancel = () => reader.cancel(); + // request above is stale since new request-id has been issued + if (this.requestId !== requestId) { + abortController.abort(); + return; + } - const reader = this.stream.getReader(); + let jsonBuffer = ""; + const stream = request.body.pipeThrough(new TextDecoderStream()); + const reader = stream.getReader(); + + this.reader = reader; while (true) { const { done, value } = await reader.read(); - if (done) break; - this.processStreamChunk(value); + if (done) break; // exit + + const events = (jsonBuffer + value).split(this.splitter); + + jsonBuffer = this.processBuffer(events); } } catch (error) { this.log({ message: error }); } } - protected async processStreamChunk(chunk: string) { - const { jsonBuffer, splitter } = this; - const eventsBuffer = (jsonBuffer + chunk).split(splitter); - let jsonEvent: string; + protected disconnect() { + this.reader?.cancel(); + this.reader = null; + } - while (jsonEvent = eventsBuffer.shift()) { + // process received stream events, returns unprocessed buffer chunk if any + protected processBuffer(events: string[]): string { + for (let json of events) { try { - const kubeEvent: IKubeWatchEvent = JSON.parse(jsonEvent); + const kubeEvent: IKubeWatchEvent = JSON.parse(json); const message = this.getMessage(kubeEvent); this.onMessage.emit(message); } catch (error) { - eventsBuffer.unshift(jsonEvent); // put unparsed json back to buffer - break; + return json; } } - // save last unprocessed json-tail or reset buffer otherwise - this.jsonBuffer = eventsBuffer.join(splitter); - } - - protected async disconnect() { - this.stream?.cancel(); - this.stream = null; + return ""; } protected getMessage(event: IKubeWatchEvent): IKubeWatchMessage { diff --git a/src/renderer/kube-object.store.ts b/src/renderer/kube-object.store.ts index 4242fe819a..2688e90f10 100644 --- a/src/renderer/kube-object.store.ts +++ b/src/renderer/kube-object.store.ts @@ -207,7 +207,7 @@ export abstract class KubeObjectStore extends ItemSt return [this.api]; } - async subscribe(apis = this.getSubscribeApis()) { + async subscribe(apis = this.getSubscribeApis()): Promise<() => void> { const cluster = await this.resolveCluster(); const allowedApis = apis.filter(api => cluster.isAllowedResource(api.kind));