1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

fix: parse json stream chunks at client-side (might be partial, depends on network speed)

Signed-off-by: Roman <ixrock@gmail.com>
This commit is contained in:
Roman 2021-01-19 17:25:15 +02:00
parent 3a3bd26fbe
commit 9b031b7c68
2 changed files with 34 additions and 49 deletions

View File

@ -36,6 +36,8 @@ class ApiWatcher {
this.response = response; this.response = response;
} }
// FIXME: add delay to kube-watch-api requests to avoid possible ECONNRESET error
// https://stackoverflow.com/questions/17245881/how-do-i-debug-error-econnreset-in-node-js
public async start() { public async start() {
if (this.processor) { if (this.processor) {
clearInterval(this.processor); clearInterval(this.processor);
@ -119,15 +121,14 @@ class WatchRoute extends LensApi {
}); });
request.raw.req.on("close", () => { request.raw.req.on("close", () => {
logger.debug("Watch request closed"); logger.info("Watch request closed");
watchers.map(watcher => watcher.stop()); watchers.map(watcher => watcher.stop());
}); });
request.raw.req.on("end", () => { request.raw.req.on("end", () => {
logger.debug("Watch request ended"); logger.info("Watch request ended");
watchers.map(watcher => watcher.stop()); watchers.map(watcher => watcher.stop());
}); });
} }
} }

View File

@ -28,10 +28,12 @@ export interface IKubeWatchLog {
@autobind() @autobind()
export class KubeWatchApi { export class KubeWatchApi {
protected stream: ReadableStream<Uint8Array>; // https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams protected stream: ReadableStream<string>; // https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams
protected subscribers = observable.map<KubeApi, number>(); protected subscribers = observable.map<KubeApi, number>();
protected reconnectTimeoutMs = 5000; protected reconnectTimeoutMs = 5000;
protected maxReconnectsOnError = 10; protected maxReconnectsOnError = 10;
protected jsonBuffer = "";
protected splitter = "\n";
// events // events
onMessage = new EventEmitter<[IKubeWatchMessage]>(); onMessage = new EventEmitter<[IKubeWatchMessage]>();
@ -119,64 +121,46 @@ export class KubeWatchApi {
} }
}); });
const reader = req.body.getReader(); this.stream = req.body.pipeThrough(new TextDecoderStream());
const handleEvent = this.handleStreamEvent.bind(this); this.stream.cancel = () => reader.cancel();
this.stream = new ReadableStream({ const reader = this.stream.getReader();
start(controller) {
return reader.read().then(function processEvent({ done, value }): Promise<void> {
if (done) {
controller.close();
return; while (true) {
} const { done, value } = await reader.read();
handleEvent(value);
controller.enqueue(value);
return reader.read().then(processEvent); if (done) break;
}); this.processStreamChunk(value);
}, }
cancel() {
reader.cancel();
}
});
} catch (error) { } catch (error) {
this.log({ this.log({ message: error });
message: new Error("connection error"),
meta: { error }
});
} }
} }
protected async disconnect() { protected async processStreamChunk(chunk: string) {
if (this.stream) { const { jsonBuffer, splitter } = this;
this.stream.cancel(); const eventsBuffer = (jsonBuffer + chunk).split(splitter);
this.stream = null; let jsonEvent: string;
}
}
protected handleStreamEvent(chunk: Uint8Array) { while (jsonEvent = eventsBuffer.shift()) {
const jsonText = new TextDecoder().decode(chunk);
if (!jsonText) {
return;
}
// decoded json might contain multiple kube-events at a time
const events = jsonText.trim().split("\n");
events.forEach(event => {
try { try {
const message = this.getMessage(JSON.parse(event)); const kubeEvent: IKubeWatchEvent = JSON.parse(jsonEvent);
const message = this.getMessage(kubeEvent);
this.onMessage.emit(message); this.onMessage.emit(message);
} catch (error) { } catch (error) {
this.log({ eventsBuffer.unshift(jsonEvent); // put unparsed json back to buffer
message: new Error("failed to parse watch-api event"), break;
meta: { error, event },
});
} }
}); }
// save last unprocessed json-tail or reset buffer otherwise
this.jsonBuffer = eventsBuffer.join(splitter);
}
protected async disconnect() {
this.stream?.cancel();
this.stream = null;
} }
protected getMessage(event: IKubeWatchEvent): IKubeWatchMessage { protected getMessage(event: IKubeWatchEvent): IKubeWatchMessage {