diff --git a/src/renderer/api/kube-watch-api.ts b/src/renderer/api/kube-watch-api.ts index fee40d4872..64af0c2e5a 100644 --- a/src/renderer/api/kube-watch-api.ts +++ b/src/renderer/api/kube-watch-api.ts @@ -40,15 +40,34 @@ export class KubeWatchApi { private namespaceStore: NamespaceStore; private requestId = 0; + private isConnected = false; private reader: ReadableStreamReader; private subscribers = observable.map(); - private splitter = "\n"; - private reconnectTimeoutMs = 5000; - private maxReconnectsOnError = 10; // events public onMessage = new EventEmitter<[IKubeWatchMessage]>(); + @computed get isActive(): boolean { + return this.apis.length > 0; + } + + @computed get apis(): string[] { + const { cluster, namespaceStore } = this; + const activeApis = Array.from(this.subscribers.keys()); + + return activeApis.map(api => { + if (!cluster.isAllowedResource(api.kind)) { + return []; + } + + if (api.isNamespaced) { + return namespaceStore.getContextNamespaces().map(namespace => api.getWatchUrl(namespace)); + } else { + return api.getWatchUrl(); + } + }).flat(); + } + constructor() { this.init(); } @@ -67,27 +86,14 @@ export class KubeWatchApi { private bindAutoConnect() { const connect = debounce(() => this.connect(), 1000); - return reaction(() => this.activeApis, connect, { + reaction(() => this.apis, connect, { fireImmediately: true, equals: comparer.structural, }); - } - @computed get activeApis(): string[] { - const { cluster, namespaceStore } = this; - const activeApis = Array.from(this.subscribers.keys()); - - return activeApis.map(api => { - if (!cluster.isAllowedResource(api.kind)) { - return []; - } - - if (api.isNamespaced) { - return namespaceStore.getContextNamespaces().map(namespace => api.getWatchUrl(namespace)); - } else { - return api.getWatchUrl(); - } - }).flat(); + window.addEventListener("online", () => this.connect()); + window.addEventListener("offline", () => this.disconnect()); + setInterval(() => this.connectionCheck(), 30000); } getSubscribersCount(api: KubeApi) { @@ -149,10 +155,18 @@ export class KubeWatchApi { return unsubscribe; } - protected async connect(apis = this.activeApis) { + protected connectionCheck() { + if (this.isConnected) return; + + return this.connect(); + } + + protected async connect(apis = this.apis) { this.disconnect(); // close active connections first - if (!apis.length) { + if (!navigator.onLine || !apis.length) { + this.isConnected = false; + return; } @@ -185,6 +199,7 @@ export class KubeWatchApi { const stream = request.body.pipeThrough(new TextDecoderStream()); const reader = stream.getReader(); + this.isConnected = true; this.reader = reader; while (true) { @@ -192,18 +207,21 @@ export class KubeWatchApi { if (done) break; // exit - const events = (jsonBuffer + value).split(this.splitter); + const events = (jsonBuffer + value).split("\n"); jsonBuffer = this.processBuffer(events); } } catch (error) { this.log({ message: error }); + } finally { + this.isConnected = false; } } protected disconnect() { this.reader?.cancel(); this.reader = null; + this.isConnected = false; } // process received stream events, returns unprocessed buffer chunk if any @@ -272,14 +290,12 @@ export class KubeWatchApi { this.connect(); } catch (error) { this.log({ - message: new Error("Failed to reconnect on stream end"), - meta: { error, event }, + message: new Error(`Failed to connect on single stream end: ${error}`), + meta: { event, error }, }); - if (this.subscribers.size > 0) { - setTimeout(() => { - this.onServerStreamEnd(event); - }, 1000); + if (this.isActive) { + setTimeout(() => this.onServerStreamEnd(event), 1000); } } }