From 557d96d4849e65e378b321f081a78af27d75080d Mon Sep 17 00:00:00 2001 From: Jari Kolehmainen Date: Fri, 12 Feb 2021 16:37:41 +0200 Subject: [PATCH] Handle suspend/resume error from watch stream read (#2136) * handle suspend/resume error from stream read Signed-off-by: Jari Kolehmainen * add missing types Signed-off-by: Jari Kolehmainen --- package.json | 3 +- src/renderer/api/kube-api.ts | 23 +++++--- src/renderer/utils/readableStream.ts | 87 ++++++++++++++++++++++++++++ yarn.lock | 8 --- 4 files changed, 105 insertions(+), 16 deletions(-) create mode 100644 src/renderer/utils/readableStream.ts diff --git a/package.json b/package.json index f663accadc..05da6d14a5 100644 --- a/package.json +++ b/package.json @@ -222,7 +222,7 @@ "react": "^17.0.1", "react-dom": "^17.0.1", "react-router": "^5.2.0", - "readable-web-to-node-stream": "^3.0.1", + "readable-stream": "^3.6.0", "request": "^2.88.2", "request-promise-native": "^1.0.8", "semver": "^7.3.2", @@ -277,6 +277,7 @@ "@types/react-router-dom": "^5.1.6", "@types/react-select": "^3.0.13", "@types/react-window": "^1.8.2", + "@types/readable-stream": "^2.3.9", "@types/request": "^2.48.5", "@types/request-promise-native": "^1.0.17", "@types/semver": "^7.2.0", diff --git a/src/renderer/api/kube-api.ts b/src/renderer/api/kube-api.ts index 97cfb0522b..a880cc2406 100644 --- a/src/renderer/api/kube-api.ts +++ b/src/renderer/api/kube-api.ts @@ -10,8 +10,8 @@ import { createKubeApiURL, parseKubeApi } from "./kube-api-parse"; import { KubeJsonApi, KubeJsonApiData, KubeJsonApiDataList } from "./kube-json-api"; import { IKubeObjectConstructor, KubeObject, KubeStatus } from "./kube-object"; import byline from "byline"; -import { ReadableWebToNodeStream } from "readable-web-to-node-stream"; import { IKubeWatchEvent } from "./kube-watch-api"; +import { ReadableWebToNodeStream } from "../utils/readableStream"; export interface IKubeApiOptions { /** @@ -373,7 +373,13 @@ export class KubeApi { opts.abortController = new AbortController(); } let errorReceived = false; + let timedRetry: NodeJS.Timeout; const { abortController, namespace, callback } = opts; + + abortController.signal.addEventListener("abort", () => { + clearTimeout(timedRetry); + }); + const watchUrl = this.getWatchUrl(namespace); const responsePromise = this.request.getResponse(watchUrl, null, { signal: abortController.signal @@ -387,14 +393,17 @@ export class KubeApi { } const nodeStream = new ReadableWebToNodeStream(response.body); - nodeStream.on("end", () => { - if (errorReceived) return; // kubernetes errors should be handled in a callback + ["end", "close", "error"].forEach((eventName) => { + nodeStream.on(eventName, () => { + if (errorReceived) return; // kubernetes errors should be handled in a callback - setTimeout(() => { // we did not get any kubernetes errors so let's retry - if (abortController.signal.aborted) return; + clearTimeout(timedRetry); + timedRetry = setTimeout(() => { // we did not get any kubernetes errors so let's retry + if (abortController.signal.aborted) return; - this.watch({...opts, namespace, callback}); - }, 1000); + this.watch({...opts, namespace, callback}); + }, 1000); + }); }); const stream = byline(nodeStream); diff --git a/src/renderer/utils/readableStream.ts b/src/renderer/utils/readableStream.ts new file mode 100644 index 0000000000..3b51106427 --- /dev/null +++ b/src/renderer/utils/readableStream.ts @@ -0,0 +1,87 @@ +import { Readable } from "readable-stream"; + +/** + * ReadableWebToNodeStream + * + * Copied from https://github.com/Borewit/readable-web-to-node-stream + * + * Adds read error handler + * + * */ +export class ReadableWebToNodeStream extends Readable { + + public bytesRead = 0; + public released = false; + + /** + * Default web API stream reader + * https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader + */ + private reader: ReadableStreamReader; + private pendingRead: Promise; + + /** + * + * @param stream Readable​Stream: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream + */ + constructor(stream: ReadableStream) { + super(); + this.reader = stream.getReader(); + } + + /** + * Implementation of readable._read(size). + * When readable._read() is called, if data is available from the resource, + * the implementation should begin pushing that data into the read queue + * https://nodejs.org/api/stream.html#stream_readable_read_size_1 + */ + public async _read() { + // Should start pushing data into the queue + // Read data from the underlying Web-API-readable-stream + if (this.released) { + this.push(null); // Signal EOF + + return; + } + + try { + this.pendingRead = this.reader.read(); + const data = await this.pendingRead; + + // clear the promise before pushing pushing new data to the queue and allow sequential calls to _read() + delete this.pendingRead; + + if (data.done || this.released) { + this.push(null); // Signal EOF + } else { + this.bytesRead += data.value.length; + this.push(data.value); // Push new data to the queue + } + } catch(error) { + this.push(null); // Signal EOF + } + } + + /** + * If there is no unresolved read call to Web-API Readable​Stream immediately returns; + * otherwise will wait until the read is resolved. + */ + public async waitForReadToComplete() { + if (this.pendingRead) { + await this.pendingRead; + } + } + + /** + * Close wrapper + */ + public async close(): Promise { + await this.syncAndRelease(); + } + + private async syncAndRelease() { + this.released = true; + await this.waitForReadToComplete(); + await this.reader.releaseLock(); + } +} diff --git a/yarn.lock b/yarn.lock index cd3384eba5..16ac2ca4f8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -11464,14 +11464,6 @@ readable-stream@~1.1.10: isarray "0.0.1" string_decoder "~0.10.x" -readable-web-to-node-stream@^3.0.1: - version "3.0.1" - resolved "https://registry.yarnpkg.com/readable-web-to-node-stream/-/readable-web-to-node-stream-3.0.1.tgz#3f619b1bc5dd73a4cfe5c5f9b4f6faba55dff845" - integrity sha512-4zDC6CvjUyusN7V0QLsXVB7pJCD9+vtrM9bYDRv6uBQ+SKfx36rp5AFNPRgh9auKRul/a1iFZJYXcCbwRL+SaA== - dependencies: - "@types/readable-stream" "^2.3.9" - readable-stream "^3.6.0" - readdir-scoped-modules@^1.0.0, readdir-scoped-modules@^1.1.0: version "1.1.0" resolved "https://registry.yarnpkg.com/readdir-scoped-modules/-/readdir-scoped-modules-1.1.0.tgz#8d45407b4f870a0dcaebc0e28670d18e74514309"