1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

store subscribing refactoring -- part 3

Signed-off-by: Roman <ixrock@gmail.com>
This commit is contained in:
Roman 2021-01-23 19:28:27 +02:00
parent 7b4e060067
commit b690a27ebe
2 changed files with 80 additions and 69 deletions

View File

@ -1,11 +1,14 @@
// Kubernetes watch-api client
// API: https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams
import type { Cluster } from "../../main/cluster";
import type { IKubeWatchEvent, IKubeWatchEventStreamEnd, IWatchRoutePayload } from "../../main/routes/watch-route";
import type { KubeObject } from "./kube-object";
import type { KubeObjectStore } from "../kube-object.store";
import type { NamespaceStore } from "../components/+namespaces/namespace.store";
import { computed, observable, reaction } from "mobx";
import debounce from "lodash/debounce";
import { comparer, computed, observable, reaction } from "mobx";
import { autobind, EventEmitter } from "../utils";
import { ensureObjectSelfLink, KubeApi, parseKubeApi } from "./kube-api";
import { KubeJsonApiData, KubeJsonApiError } from "./kube-json-api";
@ -33,29 +36,58 @@ export interface IKubeWatchLog {
@autobind()
export class KubeWatchApi {
protected stream: ReadableStream<string>; // https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams
protected subscribers = observable.map<KubeApi, number>();
protected reconnectTimeoutMs = 5000;
protected maxReconnectsOnError = 10;
protected jsonBuffer = "";
protected splitter = "\n";
private cluster: Cluster;
private namespaceStore: NamespaceStore;
private requestId = 0;
private reader: ReadableStreamReader<string>;
private subscribers = observable.map<KubeApi, number>();
private splitter = "\n";
private reconnectTimeoutMs = 5000;
private maxReconnectsOnError = 10;
// events
onMessage = new EventEmitter<[IKubeWatchMessage]>();
public onMessage = new EventEmitter<[IKubeWatchMessage]>();
constructor() {
this.init();
}
private async init() {
const { getHostedCluster } = await import("../../common/cluster-store");
const { namespaceStore } = await import("../components/+namespaces/namespace.store");
await namespaceStore.whenReady;
this.cluster = getHostedCluster();
this.namespaceStore = namespaceStore;
this.bindAutoConnect();
}
private bindAutoConnect() {
return reaction(() => this.activeApis, () => this.connect(), {
const connect = debounce(() => this.connect(), 1000);
return reaction(() => this.activeApis, connect, {
fireImmediately: true,
delay: 500,
equals: comparer.structural,
});
}
@computed get activeApis() {
return Array.from(this.subscribers.keys());
@computed get activeApis(): string[] {
const { cluster, namespaceStore } = this;
const activeApis = Array.from(this.subscribers.keys());
return activeApis.map(api => {
if (!cluster.isAllowedResource(api.kind)) {
return [];
}
if (api.isNamespaced) {
return namespaceStore.getContextNamespaces().map(namespace => api.getWatchUrl(namespace));
} else {
return api.getWatchUrl();
}
}).flat();
}
getSubscribersCount(api: KubeApi) {
@ -116,97 +148,76 @@ export class KubeWatchApi {
return unsubscribe;
}
protected async resolveCluster(): Promise<Cluster> {
const { getHostedCluster } = await import("../../common/cluster-store");
protected async connect(apis = this.activeApis) {
this.disconnect(); // close active connections first
return getHostedCluster();
}
protected async getRequestPayload(): Promise<IWatchRoutePayload> {
const cluster = await this.resolveCluster();
const { namespaceStore } = await import("../components/+namespaces/namespace.store");
await namespaceStore.whenReady;
return {
apis: this.activeApis.map(api => {
if (!cluster.isAllowedResource(api.kind)) {
return [];
}
if (api.isNamespaced) {
return namespaceStore.getContextNamespaces().map(namespace => api.getWatchUrl(namespace));
} else {
return api.getWatchUrl();
}
}).flat()
};
}
protected async connect() {
this.disconnect(); // close active connection first
const payload = await this.getRequestPayload();
if (!payload.apis.length) {
if (!apis.length) {
return;
}
this.log({
message: "Connecting",
meta: payload,
meta: { apis }
});
try {
const req = await fetch(`${apiPrefix}/watch`, {
const requestId = ++this.requestId;
const abortController = new AbortController();
const request = await fetch(`${apiPrefix}/watch`, {
method: "POST",
body: JSON.stringify(payload),
keepalive: true,
body: JSON.stringify({ apis } as IWatchRoutePayload),
signal: abortController.signal,
headers: {
"content-type": "application/json"
}
});
this.stream = req.body.pipeThrough(new TextDecoderStream());
this.stream.cancel = () => reader.cancel();
// request above is stale since new request-id has been issued
if (this.requestId !== requestId) {
abortController.abort();
return;
}
const reader = this.stream.getReader();
let jsonBuffer = "";
const stream = request.body.pipeThrough(new TextDecoderStream());
const reader = stream.getReader();
this.reader = reader;
while (true) {
const { done, value } = await reader.read();
if (done) break;
this.processStreamChunk(value);
if (done) break; // exit
const events = (jsonBuffer + value).split(this.splitter);
jsonBuffer = this.processBuffer(events);
}
} catch (error) {
this.log({ message: error });
}
}
protected async processStreamChunk(chunk: string) {
const { jsonBuffer, splitter } = this;
const eventsBuffer = (jsonBuffer + chunk).split(splitter);
let jsonEvent: string;
protected disconnect() {
this.reader?.cancel();
this.reader = null;
}
while (jsonEvent = eventsBuffer.shift()) {
// process received stream events, returns unprocessed buffer chunk if any
protected processBuffer(events: string[]): string {
for (let json of events) {
try {
const kubeEvent: IKubeWatchEvent = JSON.parse(jsonEvent);
const kubeEvent: IKubeWatchEvent = JSON.parse(json);
const message = this.getMessage(kubeEvent);
this.onMessage.emit(message);
} catch (error) {
eventsBuffer.unshift(jsonEvent); // put unparsed json back to buffer
break;
return json;
}
}
// save last unprocessed json-tail or reset buffer otherwise
this.jsonBuffer = eventsBuffer.join(splitter);
}
protected async disconnect() {
this.stream?.cancel();
this.stream = null;
return "";
}
protected getMessage(event: IKubeWatchEvent): IKubeWatchMessage {

View File

@ -207,7 +207,7 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
return [this.api];
}
async subscribe(apis = this.getSubscribeApis()) {
async subscribe(apis = this.getSubscribeApis()): Promise<() => void> {
const cluster = await this.resolveCluster();
const allowedApis = apis.filter(api => cluster.isAllowedResource(api.kind));