1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

Handle suspend/resume error from watch stream read (#2136)

* handle suspend/resume error from stream read

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* add missing types

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>
This commit is contained in:
Jari Kolehmainen 2021-02-12 16:37:41 +02:00 committed by GitHub
parent 4bc4fe3a24
commit 557d96d484
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 105 additions and 16 deletions

View File

@ -222,7 +222,7 @@
"react": "^17.0.1",
"react-dom": "^17.0.1",
"react-router": "^5.2.0",
"readable-web-to-node-stream": "^3.0.1",
"readable-stream": "^3.6.0",
"request": "^2.88.2",
"request-promise-native": "^1.0.8",
"semver": "^7.3.2",
@ -277,6 +277,7 @@
"@types/react-router-dom": "^5.1.6",
"@types/react-select": "^3.0.13",
"@types/react-window": "^1.8.2",
"@types/readable-stream": "^2.3.9",
"@types/request": "^2.48.5",
"@types/request-promise-native": "^1.0.17",
"@types/semver": "^7.2.0",

View File

@ -10,8 +10,8 @@ import { createKubeApiURL, parseKubeApi } from "./kube-api-parse";
import { KubeJsonApi, KubeJsonApiData, KubeJsonApiDataList } from "./kube-json-api";
import { IKubeObjectConstructor, KubeObject, KubeStatus } from "./kube-object";
import byline from "byline";
import { ReadableWebToNodeStream } from "readable-web-to-node-stream";
import { IKubeWatchEvent } from "./kube-watch-api";
import { ReadableWebToNodeStream } from "../utils/readableStream";
export interface IKubeApiOptions<T extends KubeObject> {
/**
@ -373,7 +373,13 @@ export class KubeApi<T extends KubeObject = any> {
opts.abortController = new AbortController();
}
let errorReceived = false;
let timedRetry: NodeJS.Timeout;
const { abortController, namespace, callback } = opts;
abortController.signal.addEventListener("abort", () => {
clearTimeout(timedRetry);
});
const watchUrl = this.getWatchUrl(namespace);
const responsePromise = this.request.getResponse(watchUrl, null, {
signal: abortController.signal
@ -387,15 +393,18 @@ export class KubeApi<T extends KubeObject = any> {
}
const nodeStream = new ReadableWebToNodeStream(response.body);
nodeStream.on("end", () => {
["end", "close", "error"].forEach((eventName) => {
nodeStream.on(eventName, () => {
if (errorReceived) return; // kubernetes errors should be handled in a callback
setTimeout(() => { // we did not get any kubernetes errors so let's retry
clearTimeout(timedRetry);
timedRetry = setTimeout(() => { // we did not get any kubernetes errors so let's retry
if (abortController.signal.aborted) return;
this.watch({...opts, namespace, callback});
}, 1000);
});
});
const stream = byline(nodeStream);

View File

@ -0,0 +1,87 @@
import { Readable } from "readable-stream";
/**
* ReadableWebToNodeStream
*
* Copied from https://github.com/Borewit/readable-web-to-node-stream
*
* Adds read error handler
*
* */
export class ReadableWebToNodeStream extends Readable {
public bytesRead = 0;
public released = false;
/**
* Default web API stream reader
* https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader
*/
private reader: ReadableStreamReader;
private pendingRead: Promise<any>;
/**
*
* @param stream ReadableStream: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream
*/
constructor(stream: ReadableStream) {
super();
this.reader = stream.getReader();
}
/**
* Implementation of readable._read(size).
* When readable._read() is called, if data is available from the resource,
* the implementation should begin pushing that data into the read queue
* https://nodejs.org/api/stream.html#stream_readable_read_size_1
*/
public async _read() {
// Should start pushing data into the queue
// Read data from the underlying Web-API-readable-stream
if (this.released) {
this.push(null); // Signal EOF
return;
}
try {
this.pendingRead = this.reader.read();
const data = await this.pendingRead;
// clear the promise before pushing pushing new data to the queue and allow sequential calls to _read()
delete this.pendingRead;
if (data.done || this.released) {
this.push(null); // Signal EOF
} else {
this.bytesRead += data.value.length;
this.push(data.value); // Push new data to the queue
}
} catch(error) {
this.push(null); // Signal EOF
}
}
/**
* If there is no unresolved read call to Web-API ReadableStream immediately returns;
* otherwise will wait until the read is resolved.
*/
public async waitForReadToComplete() {
if (this.pendingRead) {
await this.pendingRead;
}
}
/**
* Close wrapper
*/
public async close(): Promise<void> {
await this.syncAndRelease();
}
private async syncAndRelease() {
this.released = true;
await this.waitForReadToComplete();
await this.reader.releaseLock();
}
}

View File

@ -11464,14 +11464,6 @@ readable-stream@~1.1.10:
isarray "0.0.1"
string_decoder "~0.10.x"
readable-web-to-node-stream@^3.0.1:
version "3.0.1"
resolved "https://registry.yarnpkg.com/readable-web-to-node-stream/-/readable-web-to-node-stream-3.0.1.tgz#3f619b1bc5dd73a4cfe5c5f9b4f6faba55dff845"
integrity sha512-4zDC6CvjUyusN7V0QLsXVB7pJCD9+vtrM9bYDRv6uBQ+SKfx36rp5AFNPRgh9auKRul/a1iFZJYXcCbwRL+SaA==
dependencies:
"@types/readable-stream" "^2.3.9"
readable-stream "^3.6.0"
readdir-scoped-modules@^1.0.0, readdir-scoped-modules@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/readdir-scoped-modules/-/readdir-scoped-modules-1.1.0.tgz#8d45407b4f870a0dcaebc0e28670d18e74514309"