mirror of
https://github.com/lensapp/lens.git
synced 2025-05-20 05:10:56 +00:00
fine-tuning
Signed-off-by: Roman <ixrock@gmail.com>
This commit is contained in:
parent
1a3a215118
commit
3a3bd26fbe
@ -88,7 +88,7 @@ class ApiWatcher {
|
||||
}
|
||||
|
||||
private sendEvent(evt: IKubeWatchEvent) {
|
||||
this.response.write(JSON.stringify(evt) + "\n");
|
||||
this.response.write(`${JSON.stringify(evt)}\n`);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1,18 +1,18 @@
|
||||
// Kubernetes watch-api consumer
|
||||
import type { IKubeWatchEvent, IKubeWatchEventStreamEnd, IWatchRoutePayload } from "../../main/routes/watch-route";
|
||||
import type { KubeObjectStore } from "../kube-object.store";
|
||||
import type { KubeObject } from "./kube-object";
|
||||
// Kubernetes watch-api client
|
||||
|
||||
import type { Cluster } from "../../main/cluster";
|
||||
import type { IKubeWatchEvent, IKubeWatchEventStreamEnd, IWatchRoutePayload } from "../../main/routes/watch-route";
|
||||
|
||||
import type { KubeObject } from "./kube-object";
|
||||
import { computed, observable, reaction } from "mobx";
|
||||
import { autobind, EventEmitter } from "../utils";
|
||||
import { KubeJsonApiData, KubeJsonApiError } from "./kube-json-api";
|
||||
import { ensureObjectSelfLink, KubeApi } from "./kube-api";
|
||||
import { getHostedCluster } from "../../common/cluster-store";
|
||||
import { KubeJsonApiData, KubeJsonApiError } from "./kube-json-api";
|
||||
import { KubeObjectStore } from "../kube-object.store";
|
||||
import { apiPrefix, isProduction } from "../../common/vars";
|
||||
import { apiManager } from "./api-manager";
|
||||
import logger from "../../main/logger";
|
||||
|
||||
export { IKubeWatchEvent, IKubeWatchEventStreamEnd }
|
||||
export { IKubeWatchEvent, IKubeWatchEventStreamEnd };
|
||||
|
||||
export interface IKubeWatchMessage<T extends KubeObject = any> {
|
||||
data?: IKubeWatchEvent<KubeJsonApiData>
|
||||
@ -68,22 +68,29 @@ export class KubeWatchApi {
|
||||
});
|
||||
}
|
||||
|
||||
protected async getWatchRoutePayload(): Promise<IWatchRoutePayload> {
|
||||
protected async resolveCluster(): Promise<Cluster> {
|
||||
const { getHostedCluster } = await import("../../common/cluster-store");
|
||||
|
||||
return getHostedCluster();
|
||||
}
|
||||
|
||||
protected async getRequestPayload(): Promise<IWatchRoutePayload> {
|
||||
const cluster = await this.resolveCluster();
|
||||
const { namespaceStore } = await import("../components/+namespaces/namespace.store");
|
||||
|
||||
await namespaceStore.whenReady;
|
||||
const { isAdmin } = getHostedCluster();
|
||||
|
||||
return {
|
||||
apis: this.activeApis.map(api => {
|
||||
if (isAdmin && !api.isNamespaced) {
|
||||
return api.getWatchUrl();
|
||||
if (!cluster.isAllowedResource(api.kind)) {
|
||||
return [];
|
||||
}
|
||||
|
||||
if (api.isNamespaced) {
|
||||
return namespaceStore.getContextNamespaces().map(namespace => api.getWatchUrl(namespace));
|
||||
} else {
|
||||
return api.getWatchUrl();
|
||||
}
|
||||
|
||||
return [];
|
||||
}).flat()
|
||||
};
|
||||
}
|
||||
@ -91,7 +98,7 @@ export class KubeWatchApi {
|
||||
protected async connect() {
|
||||
this.disconnect(); // close active connection first
|
||||
|
||||
const payload = await this.getWatchRoutePayload();
|
||||
const payload = await this.getRequestPayload();
|
||||
|
||||
if (!payload.apis.length) {
|
||||
return;
|
||||
@ -120,10 +127,12 @@ export class KubeWatchApi {
|
||||
return reader.read().then(function processEvent({ done, value }): Promise<void> {
|
||||
if (done) {
|
||||
controller.close();
|
||||
|
||||
return;
|
||||
}
|
||||
handleEvent(value);
|
||||
controller.enqueue(value);
|
||||
|
||||
return reader.read().then(processEvent);
|
||||
});
|
||||
},
|
||||
@ -148,24 +157,26 @@ export class KubeWatchApi {
|
||||
|
||||
protected handleStreamEvent(chunk: Uint8Array) {
|
||||
const jsonText = new TextDecoder().decode(chunk);
|
||||
|
||||
if (!jsonText) {
|
||||
return;
|
||||
}
|
||||
|
||||
// decoded json might contain multiple kube-events at a time
|
||||
const events = jsonText.split("\n");
|
||||
const events = jsonText.trim().split("\n");
|
||||
|
||||
events.forEach(kubeEvent => {
|
||||
events.forEach(event => {
|
||||
try {
|
||||
const message = this.getMessage(JSON.parse(kubeEvent));
|
||||
const message = this.getMessage(JSON.parse(event));
|
||||
|
||||
this.onMessage.emit(message);
|
||||
} catch (error) {
|
||||
this.log({
|
||||
message: new Error("failed to parse watch-api event"),
|
||||
meta: { error, jsonText },
|
||||
meta: { error, event },
|
||||
});
|
||||
}
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
protected getMessage(event: IKubeWatchEvent): IKubeWatchMessage {
|
||||
@ -174,6 +185,7 @@ export class KubeWatchApi {
|
||||
switch (event.type) {
|
||||
case "ADDED":
|
||||
case "DELETED":
|
||||
|
||||
case "MODIFIED": {
|
||||
const data = event as IKubeWatchEvent<KubeJsonApiData>;
|
||||
const api = apiManager.getApiByKind(data.object.kind, data.object.apiVersion);
|
||||
@ -184,6 +196,7 @@ export class KubeWatchApi {
|
||||
ensureObjectSelfLink(api, data.object);
|
||||
|
||||
const { namespace, resourceVersion } = data.object.metadata;
|
||||
|
||||
api.setResourceVersion(namespace, resourceVersion);
|
||||
api.setResourceVersion("", resourceVersion);
|
||||
|
||||
@ -232,13 +245,14 @@ export class KubeWatchApi {
|
||||
protected log({ message, meta }: IKubeWatchLog) {
|
||||
if (isProduction) return;
|
||||
|
||||
const logMessage = `[KUBE-WATCH-API]: ${String(message).toUpperCase()}`;
|
||||
const logMessage = `%c[KUBE-WATCH-API]: ${String(message).toUpperCase()}`;
|
||||
const isError = message instanceof Error;
|
||||
const textStyle = `font-weight: bold; ${isError ? "color: red;" : ""}`;
|
||||
|
||||
if (isError) {
|
||||
logger.error(logMessage, meta);
|
||||
console.error(logMessage, textStyle, meta);
|
||||
} else {
|
||||
logger.log({ message: logMessage, level: "info" });
|
||||
console.info(logMessage, textStyle, meta);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -196,7 +196,7 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
|
||||
kubeWatchApi.onMessage.addListener(({ store, data }: IKubeWatchMessage<T>) => {
|
||||
if (!this.isLoaded || store !== this) return;
|
||||
this.eventsBuffer.push(data);
|
||||
})
|
||||
});
|
||||
|
||||
reaction(() => this.eventsBuffer.length > 0, this.updateFromEventsBuffer, {
|
||||
delay
|
||||
|
||||
Loading…
Reference in New Issue
Block a user