diff --git a/src/main/routes/watch-route.ts b/src/main/routes/watch-route.ts index d7d522d14f..9174301ee5 100644 --- a/src/main/routes/watch-route.ts +++ b/src/main/routes/watch-route.ts @@ -36,6 +36,8 @@ class ApiWatcher { this.response = response; } + // FIXME: add delay to kube-watch-api requests to avoid possible ECONNRESET error + // https://stackoverflow.com/questions/17245881/how-do-i-debug-error-econnreset-in-node-js public async start() { if (this.processor) { clearInterval(this.processor); @@ -119,15 +121,14 @@ class WatchRoute extends LensApi { }); request.raw.req.on("close", () => { - logger.debug("Watch request closed"); + logger.info("Watch request closed"); watchers.map(watcher => watcher.stop()); }); request.raw.req.on("end", () => { - logger.debug("Watch request ended"); + logger.info("Watch request ended"); watchers.map(watcher => watcher.stop()); }); - } } diff --git a/src/renderer/api/kube-watch-api.ts b/src/renderer/api/kube-watch-api.ts index 559ce4849b..3d440690f8 100644 --- a/src/renderer/api/kube-watch-api.ts +++ b/src/renderer/api/kube-watch-api.ts @@ -28,10 +28,12 @@ export interface IKubeWatchLog { @autobind() export class KubeWatchApi { - protected stream: ReadableStream; // https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams + protected stream: ReadableStream; // https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams protected subscribers = observable.map(); protected reconnectTimeoutMs = 5000; protected maxReconnectsOnError = 10; + protected jsonBuffer = ""; + protected splitter = "\n"; // events onMessage = new EventEmitter<[IKubeWatchMessage]>(); @@ -119,64 +121,46 @@ export class KubeWatchApi { } }); - const reader = req.body.getReader(); - const handleEvent = this.handleStreamEvent.bind(this); + this.stream = req.body.pipeThrough(new TextDecoderStream()); + this.stream.cancel = () => reader.cancel(); - this.stream = new ReadableStream({ - start(controller) { - return reader.read().then(function processEvent({ done, value }): Promise { - if (done) { - controller.close(); + const reader = this.stream.getReader(); - return; - } - handleEvent(value); - controller.enqueue(value); + while (true) { + const { done, value } = await reader.read(); - return reader.read().then(processEvent); - }); - }, - cancel() { - reader.cancel(); - } - }); + if (done) break; + this.processStreamChunk(value); + } } catch (error) { - this.log({ - message: new Error("connection error"), - meta: { error } - }); + this.log({ message: error }); } } - protected async disconnect() { - if (this.stream) { - this.stream.cancel(); - this.stream = null; - } - } + protected async processStreamChunk(chunk: string) { + const { jsonBuffer, splitter } = this; + const eventsBuffer = (jsonBuffer + chunk).split(splitter); + let jsonEvent: string; - protected handleStreamEvent(chunk: Uint8Array) { - const jsonText = new TextDecoder().decode(chunk); - - if (!jsonText) { - return; - } - - // decoded json might contain multiple kube-events at a time - const events = jsonText.trim().split("\n"); - - events.forEach(event => { + while (jsonEvent = eventsBuffer.shift()) { try { - const message = this.getMessage(JSON.parse(event)); + const kubeEvent: IKubeWatchEvent = JSON.parse(jsonEvent); + const message = this.getMessage(kubeEvent); this.onMessage.emit(message); } catch (error) { - this.log({ - message: new Error("failed to parse watch-api event"), - meta: { error, event }, - }); + eventsBuffer.unshift(jsonEvent); // put unparsed json back to buffer + break; } - }); + } + + // save last unprocessed json-tail or reset buffer otherwise + this.jsonBuffer = eventsBuffer.join(splitter); + } + + protected async disconnect() { + this.stream?.cancel(); + this.stream = null; } protected getMessage(event: IKubeWatchEvent): IKubeWatchMessage {