1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

Revert "Watch-api streaming reworks (#1990)"

This reverts commit 078f952b36.

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>
This commit is contained in:
Jari Kolehmainen 2021-02-05 09:45:55 +02:00
parent 3afef111e8
commit 6f7cb4d568
20 changed files with 405 additions and 529 deletions

View File

@ -1,6 +0,0 @@
// Create async delay for provided timeout in milliseconds
export async function delay(timeoutMs = 1000) {
if (!timeoutMs) return;
await new Promise(resolve => setTimeout(resolve, timeoutMs));
}

View File

@ -7,7 +7,6 @@ export * from "./autobind";
export * from "./base64"; export * from "./base64";
export * from "./camelCase"; export * from "./camelCase";
export * from "./cloneJson"; export * from "./cloneJson";
export * from "./delay";
export * from "./debouncePromise"; export * from "./debouncePromise";
export * from "./defineGlobal"; export * from "./defineGlobal";
export * from "./getRandId"; export * from "./getRandId";

View File

@ -147,7 +147,7 @@ export class Router {
this.router.add({ method: "get", path: `${apiPrefix}/kubeconfig/service-account/{namespace}/{account}` }, kubeconfigRoute.routeServiceAccountRoute.bind(kubeconfigRoute)); this.router.add({ method: "get", path: `${apiPrefix}/kubeconfig/service-account/{namespace}/{account}` }, kubeconfigRoute.routeServiceAccountRoute.bind(kubeconfigRoute));
// Watch API // Watch API
this.router.add({ method: "post", path: `${apiPrefix}/watch` }, watchRoute.routeWatch.bind(watchRoute)); this.router.add({ method: "get", path: `${apiPrefix}/watch` }, watchRoute.routeWatch.bind(watchRoute));
// Metrics API // Metrics API
this.router.add({ method: "post", path: `${apiPrefix}/metrics` }, metricsRoute.routeMetrics.bind(metricsRoute)); this.router.add({ method: "post", path: `${apiPrefix}/metrics` }, metricsRoute.routeMetrics.bind(metricsRoute));

View File

@ -1,29 +1,10 @@
import type { KubeJsonApiData, KubeJsonApiError } from "../../renderer/api/kube-json-api";
import plimit from "p-limit";
import { delay } from "../../common/utils";
import { LensApiRequest } from "../router"; import { LensApiRequest } from "../router";
import { LensApi } from "../lens-api"; import { LensApi } from "../lens-api";
import { KubeConfig, Watch } from "@kubernetes/client-node"; import { Watch, KubeConfig } from "@kubernetes/client-node";
import { ServerResponse } from "http"; import { ServerResponse } from "http";
import { Request } from "request"; import { Request } from "request";
import logger from "../logger"; import logger from "../logger";
export interface IKubeWatchEvent<T = KubeJsonApiData | KubeJsonApiError> {
type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR" | "STREAM_END";
object?: T;
}
export interface IKubeWatchEventStreamEnd extends IKubeWatchEvent {
type: "STREAM_END";
url: string;
status: number;
}
export interface IWatchRoutePayload {
apis: string[]; // kube-api url list for subscribing to watch events
}
class ApiWatcher { class ApiWatcher {
private apiUrl: string; private apiUrl: string;
private response: ServerResponse; private response: ServerResponse;
@ -43,7 +24,6 @@ class ApiWatcher {
clearInterval(this.processor); clearInterval(this.processor);
} }
this.processor = setInterval(() => { this.processor = setInterval(() => {
if (this.response.finished) return;
const events = this.eventBuffer.splice(0); const events = this.eventBuffer.splice(0);
events.map(event => this.sendEvent(event)); events.map(event => this.sendEvent(event));
@ -53,9 +33,7 @@ class ApiWatcher {
} }
public stop() { public stop() {
if (!this.watchRequest) { if (!this.watchRequest) { return; }
return;
}
if (this.processor) { if (this.processor) {
clearInterval(this.processor); clearInterval(this.processor);
@ -64,14 +42,11 @@ class ApiWatcher {
try { try {
this.watchRequest.abort(); this.watchRequest.abort();
this.sendEvent({
const event: IKubeWatchEventStreamEnd = {
type: "STREAM_END", type: "STREAM_END",
url: this.apiUrl, url: this.apiUrl,
status: 410, status: 410,
}; });
this.sendEvent(event);
logger.debug("watch aborted"); logger.debug("watch aborted");
} catch (error) { } catch (error) {
logger.error(`Watch abort errored:${error}`); logger.error(`Watch abort errored:${error}`);
@ -90,72 +65,50 @@ class ApiWatcher {
this.watchRequest.abort(); this.watchRequest.abort();
} }
private sendEvent(evt: IKubeWatchEvent) { private sendEvent(evt: any) {
this.response.write(`${JSON.stringify(evt)}\n`); // convert to "text/event-stream" format
this.response.write(`data: ${JSON.stringify(evt)}\n\n`);
} }
} }
class WatchRoute extends LensApi { class WatchRoute extends LensApi {
private response: ServerResponse;
private setResponse(response: ServerResponse) { public async routeWatch(request: LensApiRequest) {
// clean up previous connection and stop all corresponding watch-api requests const { response, cluster} = request;
// otherwise it happens only by request timeout or something else.. const apis: string[] = request.query.getAll("api");
this.response?.destroy(); const watchers: ApiWatcher[] = [];
this.response = response;
}
public async routeWatch(request: LensApiRequest<IWatchRoutePayload>) { if (!apis.length) {
const { response, cluster, payload: { apis } = {} } = request;
if (!apis?.length) {
this.respondJson(response, { this.respondJson(response, {
message: "watch apis list is empty" message: "Empty request. Query params 'api' are not provided.",
example: "?api=/api/v1/pods&api=/api/v1/nodes",
}, 400); }, 400);
return; return;
} }
this.setResponse(response); response.setHeader("Content-Type", "text/event-stream");
response.setHeader("Content-Type", "application/json");
response.setHeader("Cache-Control", "no-cache"); response.setHeader("Cache-Control", "no-cache");
response.setHeader("Connection", "keep-alive"); response.setHeader("Connection", "keep-alive");
logger.debug(`watch using kubeconfig:${JSON.stringify(cluster.getProxyKubeconfig(), null, 2)}`); logger.debug(`watch using kubeconfig:${JSON.stringify(cluster.getProxyKubeconfig(), null, 2)}`);
// limit concurrent k8s requests to avoid possible ECONNRESET-error
const requests = plimit(5);
const watchers = new Map<string, ApiWatcher>();
let isWatchRequestEnded = false;
apis.forEach(apiUrl => { apis.forEach(apiUrl => {
const watcher = new ApiWatcher(apiUrl, cluster.getProxyKubeconfig(), response); const watcher = new ApiWatcher(apiUrl, cluster.getProxyKubeconfig(), response);
watchers.set(apiUrl, watcher); watcher.start();
watchers.push(watcher);
requests(async () => {
if (isWatchRequestEnded) return;
await watcher.start();
await delay(100);
});
});
function onRequestEnd() {
if (isWatchRequestEnded) return;
isWatchRequestEnded = true;
requests.clearQueue();
watchers.forEach(watcher => watcher.stop());
watchers.clear();
}
request.raw.req.on("end", () => {
logger.info("Watch request end");
onRequestEnd();
}); });
request.raw.req.on("close", () => { request.raw.req.on("close", () => {
logger.info("Watch request close"); logger.debug("Watch request closed");
onRequestEnd(); watchers.map(watcher => watcher.stop());
}); });
request.raw.req.on("end", () => {
logger.debug("Watch request ended");
watchers.map(watcher => watcher.stop());
});
} }
} }

View File

@ -2,7 +2,7 @@ import type { KubeObjectStore } from "../kube-object.store";
import { action, observable } from "mobx"; import { action, observable } from "mobx";
import { autobind } from "../utils"; import { autobind } from "../utils";
import { KubeApi, parseKubeApi } from "./kube-api"; import { KubeApi } from "./kube-api";
@autobind() @autobind()
export class ApiManager { export class ApiManager {
@ -11,7 +11,7 @@ export class ApiManager {
getApi(pathOrCallback: string | ((api: KubeApi) => boolean)) { getApi(pathOrCallback: string | ((api: KubeApi) => boolean)) {
if (typeof pathOrCallback === "string") { if (typeof pathOrCallback === "string") {
return this.apis.get(pathOrCallback) || this.apis.get(parseKubeApi(pathOrCallback).apiBase); return this.apis.get(pathOrCallback) || this.apis.get(KubeApi.parseApi(pathOrCallback).apiBase);
} }
return Array.from(this.apis.values()).find(pathOrCallback ?? (() => true)); return Array.from(this.apis.values()).find(pathOrCallback ?? (() => true));

View File

@ -92,6 +92,14 @@ export function ensureObjectSelfLink(api: KubeApi, object: KubeJsonApiData) {
} }
export class KubeApi<T extends KubeObject = any> { export class KubeApi<T extends KubeObject = any> {
static parseApi = parseKubeApi;
static watchAll(...apis: KubeApi[]) {
const disposers = apis.map(api => api.watch());
return () => disposers.forEach(unwatch => unwatch());
}
readonly kind: string; readonly kind: string;
readonly apiBase: string; readonly apiBase: string;
readonly apiPrefix: string; readonly apiPrefix: string;
@ -116,7 +124,7 @@ export class KubeApi<T extends KubeObject = any> {
if (!options.apiBase) { if (!options.apiBase) {
options.apiBase = objectConstructor.apiBase; options.apiBase = objectConstructor.apiBase;
} }
const { apiBase, apiPrefix, apiGroup, apiVersion, resource } = parseKubeApi(options.apiBase); const { apiBase, apiPrefix, apiGroup, apiVersion, resource } = KubeApi.parseApi(options.apiBase);
this.kind = kind; this.kind = kind;
this.isNamespaced = isNamespaced; this.isNamespaced = isNamespaced;
@ -149,7 +157,7 @@ export class KubeApi<T extends KubeObject = any> {
for (const apiUrl of apiBases) { for (const apiUrl of apiBases) {
// Split e.g. "/apis/extensions/v1beta1/ingresses" to parts // Split e.g. "/apis/extensions/v1beta1/ingresses" to parts
const { apiPrefix, apiGroup, apiVersionWithGroup, resource } = parseKubeApi(apiUrl); const { apiPrefix, apiGroup, apiVersionWithGroup, resource } = KubeApi.parseApi(apiUrl);
// Request available resources // Request available resources
try { try {
@ -358,7 +366,7 @@ export class KubeApi<T extends KubeObject = any> {
} }
watch(): () => void { watch(): () => void {
return kubeWatchApi.subscribeApi(this); return kubeWatchApi.subscribe(this);
} }
} }

View File

@ -1,349 +1,202 @@
// Kubernetes watch-api client // Kubernetes watch-api consumer
// API: https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams
import type { Cluster } from "../../main/cluster"; import { computed, observable, reaction } from "mobx";
import type { IKubeWatchEvent, IKubeWatchEventStreamEnd, IWatchRoutePayload } from "../../main/routes/watch-route"; import { stringify } from "querystring";
import type { KubeObject } from "./kube-object";
import type { KubeObjectStore } from "../kube-object.store";
import type { NamespaceStore } from "../components/+namespaces/namespace.store";
import plimit from "p-limit";
import debounce from "lodash/debounce";
import { comparer, computed, observable, reaction } from "mobx";
import { autobind, EventEmitter } from "../utils"; import { autobind, EventEmitter } from "../utils";
import { ensureObjectSelfLink, KubeApi, parseKubeApi } from "./kube-api"; import { KubeJsonApiData } from "./kube-json-api";
import { KubeJsonApiData, KubeJsonApiError } from "./kube-json-api"; import type { KubeObjectStore } from "../kube-object.store";
import { apiPrefix, isDebugging, isProduction } from "../../common/vars"; import { ensureObjectSelfLink, KubeApi } from "./kube-api";
import { apiManager } from "./api-manager"; import { apiManager } from "./api-manager";
import { apiPrefix, isDevelopment } from "../../common/vars";
import { getHostedCluster } from "../../common/cluster-store";
export { IKubeWatchEvent, IKubeWatchEventStreamEnd }; export interface IKubeWatchEvent<T = any> {
type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR";
export interface IKubeWatchMessage<T extends KubeObject = any> { object?: T;
data?: IKubeWatchEvent<KubeJsonApiData>
error?: IKubeWatchEvent<KubeJsonApiError>;
api?: KubeApi<T>;
store?: KubeObjectStore<T>;
} }
export interface IKubeWatchSubscribeStoreOptions { export interface IKubeWatchRouteEvent {
preload?: boolean; // preload store items, default: true type: "STREAM_END";
waitUntilLoaded?: boolean; // subscribe only after loading all stores, default: true url: string;
cacheLoading?: boolean; // when enabled loading store will be skipped, default: false status: number;
} }
export interface IKubeWatchReconnectOptions { export interface IKubeWatchRouteQuery {
reconnectAttempts: number; api: string | string[];
timeout: number;
}
export interface IKubeWatchLog {
message: string | Error;
meta?: object;
} }
@autobind() @autobind()
export class KubeWatchApi { export class KubeWatchApi {
private cluster: Cluster; protected evtSource: EventSource;
private namespaceStore: NamespaceStore; protected onData = new EventEmitter<[IKubeWatchEvent]>();
protected subscribers = observable.map<KubeApi, number>();
private requestId = 0; protected reconnectTimeoutMs = 5000;
private isConnected = false; protected maxReconnectsOnError = 10;
private reader: ReadableStreamReader<string>; protected reconnectAttempts = this.maxReconnectsOnError;
private subscribers = observable.map<KubeApi, number>();
// events
public onMessage = new EventEmitter<[IKubeWatchMessage]>();
@computed get isActive(): boolean {
return this.apis.length > 0;
}
@computed get apis(): string[] {
const { cluster, namespaceStore } = this;
const activeApis = Array.from(this.subscribers.keys());
return activeApis.map(api => {
if (!cluster.isAllowedResource(api.kind)) {
return [];
}
if (api.isNamespaced) {
return namespaceStore.getContextNamespaces().map(namespace => api.getWatchUrl(namespace));
} else {
return api.getWatchUrl();
}
}).flat();
}
constructor() { constructor() {
this.init(); reaction(() => this.activeApis, () => this.connect(), {
}
private async init() {
const { getHostedCluster } = await import("../../common/cluster-store");
const { namespaceStore } = await import("../components/+namespaces/namespace.store");
await namespaceStore.whenReady;
this.cluster = getHostedCluster();
this.namespaceStore = namespaceStore;
this.bindAutoConnect();
}
private bindAutoConnect() {
const connect = debounce(() => this.connect(), 1000);
reaction(() => this.apis, connect, {
fireImmediately: true, fireImmediately: true,
equals: comparer.structural, delay: 500,
}); });
}
window.addEventListener("online", () => this.connect()); @computed get activeApis() {
window.addEventListener("offline", () => this.disconnect()); return Array.from(this.subscribers.keys());
setInterval(() => this.connectionCheck(), 60000 * 5); // every 5m
} }
getSubscribersCount(api: KubeApi) { getSubscribersCount(api: KubeApi) {
return this.subscribers.get(api) || 0; return this.subscribers.get(api) || 0;
} }
isAllowedApi(api: KubeApi): boolean { subscribe(...apis: KubeApi[]) {
return !!this?.cluster.isAllowedResource(api.kind);
}
subscribeApi(api: KubeApi | KubeApi[]): () => void {
const apis: KubeApi[] = [api].flat();
apis.forEach(api => { apis.forEach(api => {
if (!this.isAllowedApi(api)) return; // skip
this.subscribers.set(api, this.getSubscribersCount(api) + 1); this.subscribers.set(api, this.getSubscribersCount(api) + 1);
}); });
return () => { return () => apis.forEach(api => {
apis.forEach(api => { const count = this.getSubscribersCount(api) - 1;
const count = this.getSubscribersCount(api) - 1;
if (count <= 0) this.subscribers.delete(api); if (count <= 0) this.subscribers.delete(api);
else this.subscribers.set(api, count); else this.subscribers.set(api, count);
});
};
}
subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): () => void {
const { preload = true, waitUntilLoaded = true, cacheLoading = false } = options;
const limitRequests = plimit(1); // load stores one by one to allow quick skipping when fast clicking btw pages
const preloading: Promise<any>[] = [];
const apis = new Set(stores.map(store => store.getSubscribeApis()).flat());
const unsubscribeList: (() => void)[] = [];
let isUnsubscribed = false;
const subscribe = () => {
if (isUnsubscribed) return;
apis.forEach(api => unsubscribeList.push(this.subscribeApi(api)));
};
if (preload) {
for (const store of stores) {
preloading.push(limitRequests(async () => {
if (cacheLoading && store.isLoaded) return; // skip
return store.loadAll();
}));
}
}
if (waitUntilLoaded) {
Promise.all(preloading).then(subscribe, error => {
this.log({
message: new Error("Loading stores has failed"),
meta: { stores, error, options },
});
});
} else {
subscribe();
}
// unsubscribe
return () => {
if (isUnsubscribed) return;
isUnsubscribed = true;
limitRequests.clearQueue();
unsubscribeList.forEach(unsubscribe => unsubscribe());
};
}
protected async connectionCheck() {
if (!this.isConnected) {
this.log({ message: "Offline: reconnecting.." });
await this.connect();
}
this.log({
message: `Connection check: ${this.isConnected ? "online" : "offline"}`,
meta: { connected: this.isConnected },
}); });
} }
protected async connect(apis = this.apis) { // FIXME: use POST to send apis for subscribing (list could be huge)
this.disconnect(); // close active connections first // TODO: try to use normal fetch res.body stream to consume watch-api updates
// https://github.com/lensapp/lens/issues/1898
protected async getQuery() {
const { namespaceStore } = await import("../components/+namespaces/namespace.store");
if (!navigator.onLine || !apis.length) { await namespaceStore.whenReady;
this.isConnected = false; const { isAdmin } = getHostedCluster();
return {
api: this.activeApis.map(api => {
if (isAdmin && !api.isNamespaced) {
return api.getWatchUrl();
}
if (api.isNamespaced) {
return namespaceStore.getContextNamespaces().map(namespace => api.getWatchUrl(namespace));
}
return [];
}).flat()
};
}
// todo: maybe switch to websocket to avoid often reconnects
@autobind()
protected async connect() {
if (this.evtSource) this.disconnect(); // close previous connection
const query = await this.getQuery();
if (!this.activeApis.length || !query.api.length) {
return; return;
} }
this.log({ const apiUrl = `${apiPrefix}/watch?${stringify(query)}`;
message: "Connecting",
meta: { apis }
});
try { this.evtSource = new EventSource(apiUrl);
const requestId = ++this.requestId; this.evtSource.onmessage = this.onMessage;
const abortController = new AbortController(); this.evtSource.onerror = this.onError;
this.writeLog("CONNECTING", query.api);
}
const request = await fetch(`${apiPrefix}/watch`, { reconnect() {
method: "POST", if (!this.evtSource || this.evtSource.readyState !== EventSource.OPEN) {
body: JSON.stringify({ apis } as IWatchRoutePayload), this.reconnectAttempts = this.maxReconnectsOnError;
signal: abortController.signal, this.connect();
headers: {
"content-type": "application/json"
}
});
// request above is stale since new request-id has been issued
if (this.requestId !== requestId) {
abortController.abort();
return;
}
let jsonBuffer = "";
const stream = request.body.pipeThrough(new TextDecoderStream());
const reader = stream.getReader();
this.isConnected = true;
this.reader = reader;
while (true) {
const { done, value } = await reader.read();
if (done) break; // exit
const events = (jsonBuffer + value).split("\n");
jsonBuffer = this.processBuffer(events);
}
} catch (error) {
this.log({ message: error });
} finally {
this.isConnected = false;
} }
} }
protected disconnect() { protected disconnect() {
this.reader?.cancel(); if (!this.evtSource) return;
this.reader = null; this.evtSource.close();
this.isConnected = false; this.evtSource.onmessage = null;
this.evtSource = null;
} }
// process received stream events, returns unprocessed buffer chunk if any protected onMessage(evt: MessageEvent) {
protected processBuffer(events: string[]): string { if (!evt.data) return;
for (const json of events) { const data = JSON.parse(evt.data);
try {
const kubeEvent: IKubeWatchEvent = JSON.parse(json);
const message = this.getMessage(kubeEvent);
this.onMessage.emit(message); if ((data as IKubeWatchEvent).object) {
} catch (error) { this.onData.emit(data);
return json;
}
}
return "";
}
protected getMessage(event: IKubeWatchEvent): IKubeWatchMessage {
const message: IKubeWatchMessage = {};
switch (event.type) {
case "ADDED":
case "DELETED":
case "MODIFIED": {
const data = event as IKubeWatchEvent<KubeJsonApiData>;
const api = apiManager.getApiByKind(data.object.kind, data.object.apiVersion);
message.data = data;
if (api) {
ensureObjectSelfLink(api, data.object);
const { namespace, resourceVersion } = data.object.metadata;
api.setResourceVersion(namespace, resourceVersion);
api.setResourceVersion("", resourceVersion);
message.api = api;
message.store = apiManager.getStore(api);
}
break;
}
case "ERROR":
message.error = event as IKubeWatchEvent<KubeJsonApiError>;
break;
case "STREAM_END": {
this.onServerStreamEnd(event as IKubeWatchEventStreamEnd, {
reconnectAttempts: 5,
timeout: 1000,
});
break;
}
}
return message;
}
protected async onServerStreamEnd(event: IKubeWatchEventStreamEnd, opts?: IKubeWatchReconnectOptions) {
const { apiBase, namespace } = parseKubeApi(event.url);
const api = apiManager.getApi(apiBase);
if (!api) return;
try {
await api.refreshResourceVersion({ namespace });
this.connect();
} catch (error) {
this.log({
message: new Error(`Failed to connect on single stream end: ${error}`),
meta: { event, error },
});
if (this.isActive && opts?.reconnectAttempts > 0) {
opts.reconnectAttempts--;
setTimeout(() => this.onServerStreamEnd(event, opts), opts.timeout); // repeat event
}
}
}
protected log({ message, meta = {} }: IKubeWatchLog) {
if (isProduction && !isDebugging) {
return;
}
const logMessage = `%c[KUBE-WATCH-API]: ${String(message).toUpperCase()}`;
const isError = message instanceof Error;
const textStyle = `font-weight: bold;`;
const time = new Date().toLocaleString();
if (isError) {
console.error(logMessage, textStyle, { time, ...meta });
} else { } else {
console.info(logMessage, textStyle, { time, ...meta }); this.onRouteEvent(data);
} }
} }
protected async onRouteEvent(event: IKubeWatchRouteEvent) {
if (event.type === "STREAM_END") {
this.disconnect();
const { apiBase, namespace } = KubeApi.parseApi(event.url);
const api = apiManager.getApi(apiBase);
if (api) {
try {
await api.refreshResourceVersion({ namespace });
this.reconnect();
} catch (error) {
console.error("failed to refresh resource version", error);
if (this.subscribers.size > 0) {
setTimeout(() => {
this.onRouteEvent(event);
}, 1000);
}
}
}
}
}
protected onError(evt: MessageEvent) {
const { reconnectAttempts: attemptsRemain, reconnectTimeoutMs } = this;
if (evt.eventPhase === EventSource.CLOSED) {
if (attemptsRemain > 0) {
this.reconnectAttempts--;
setTimeout(() => this.connect(), reconnectTimeoutMs);
}
}
}
protected writeLog(...data: any[]) {
if (isDevelopment) {
console.log("%cKUBE-WATCH-API:", `font-weight: bold`, ...data);
}
}
addListener(store: KubeObjectStore, callback: (evt: IKubeWatchEvent) => void) {
const listener = (evt: IKubeWatchEvent<KubeJsonApiData>) => {
if (evt.type === "ERROR") {
return; // e.g. evt.object.message == "too old resource version"
}
const { namespace, resourceVersion } = evt.object.metadata;
const api = apiManager.getApiByKind(evt.object.kind, evt.object.apiVersion);
api.setResourceVersion(namespace, resourceVersion);
api.setResourceVersion("", resourceVersion);
ensureObjectSelfLink(api, evt.object);
if (store == apiManager.getStore(api)) {
callback(evt);
}
};
this.onData.addListener(listener);
return () => this.onData.removeListener(listener);
}
reset() {
this.subscribers.clear();
}
} }
export const kubeWatchApi = new KubeWatchApi(); export const kubeWatchApi = new KubeWatchApi();

View File

@ -3,9 +3,13 @@ import "./cluster-overview.scss";
import React from "react"; import React from "react";
import { reaction } from "mobx"; import { reaction } from "mobx";
import { disposeOnUnmount, observer } from "mobx-react"; import { disposeOnUnmount, observer } from "mobx-react";
import { eventStore } from "../+events/event.store";
import { nodesStore } from "../+nodes/nodes.store"; import { nodesStore } from "../+nodes/nodes.store";
import { podsStore } from "../+workloads-pods/pods.store"; import { podsStore } from "../+workloads-pods/pods.store";
import { getHostedCluster } from "../../../common/cluster-store"; import { getHostedCluster } from "../../../common/cluster-store";
import { isAllowedResource } from "../../../common/rbac";
import { KubeObjectStore } from "../../kube-object.store";
import { interval } from "../../utils"; import { interval } from "../../utils";
import { TabLayout } from "../layout/tab-layout"; import { TabLayout } from "../layout/tab-layout";
import { Spinner } from "../spinner"; import { Spinner } from "../spinner";
@ -13,33 +17,45 @@ import { ClusterIssues } from "./cluster-issues";
import { ClusterMetrics } from "./cluster-metrics"; import { ClusterMetrics } from "./cluster-metrics";
import { clusterOverviewStore } from "./cluster-overview.store"; import { clusterOverviewStore } from "./cluster-overview.store";
import { ClusterPieCharts } from "./cluster-pie-charts"; import { ClusterPieCharts } from "./cluster-pie-charts";
import { eventStore } from "../+events/event.store";
import { kubeWatchApi } from "../../api/kube-watch-api";
@observer @observer
export class ClusterOverview extends React.Component { export class ClusterOverview extends React.Component {
private metricPoller = interval(60, () => this.loadMetrics()); private stores: KubeObjectStore<any>[] = [];
private subscribers: Array<() => void> = [];
private metricPoller = interval(60, this.loadMetrics);
@disposeOnUnmount
fetchMetrics = reaction(
() => clusterOverviewStore.metricNodeRole, // Toggle Master/Worker node switcher
() => this.metricPoller.restart(true)
);
loadMetrics() { loadMetrics() {
getHostedCluster().available && clusterOverviewStore.loadMetrics(); getHostedCluster().available && clusterOverviewStore.loadMetrics();
} }
componentDidMount() { async componentDidMount() {
this.metricPoller.start(true); if (isAllowedResource("nodes")) {
this.stores.push(nodesStore);
}
disposeOnUnmount(this, [ if (isAllowedResource("pods")) {
kubeWatchApi.subscribeStores([nodesStore, podsStore, eventStore], { this.stores.push(podsStore);
preload: true, }
}),
reaction( if (isAllowedResource("events")) {
() => clusterOverviewStore.metricNodeRole, // Toggle Master/Worker node switcher this.stores.push(eventStore);
() => this.metricPoller.restart(true) }
),
]); await Promise.all(this.stores.map(store => store.loadAll()));
this.loadMetrics();
this.subscribers = this.stores.map(store => store.subscribe());
this.metricPoller.start();
} }
componentWillUnmount() { componentWillUnmount() {
this.subscribers.forEach(dispose => dispose()); // unsubscribe all
this.metricPoller.stop(); this.metricPoller.stop();
} }

View File

@ -52,10 +52,6 @@ export class EventStore extends KubeObjectStore<KubeEvent> {
return compact(eventsWithError); return compact(eventsWithError);
} }
getWarningsCount() {
return this.getWarnings().length;
}
} }
export const eventStore = new EventStore(); export const eventStore = new EventStore();

View File

@ -2,14 +2,13 @@ import "./namespace-select.scss";
import React from "react"; import React from "react";
import { computed } from "mobx"; import { computed } from "mobx";
import { disposeOnUnmount, observer } from "mobx-react"; import { observer } from "mobx-react";
import { Select, SelectOption, SelectProps } from "../select"; import { Select, SelectOption, SelectProps } from "../select";
import { cssNames } from "../../utils"; import { cssNames, noop } from "../../utils";
import { Icon } from "../icon"; import { Icon } from "../icon";
import { namespaceStore } from "./namespace.store"; import { namespaceStore } from "./namespace.store";
import { FilterIcon } from "../item-object-list/filter-icon"; import { FilterIcon } from "../item-object-list/filter-icon";
import { FilterType } from "../item-object-list/page-filters.store"; import { FilterType } from "../item-object-list/page-filters.store";
import { kubeWatchApi } from "../../api/kube-watch-api";
interface Props extends SelectProps { interface Props extends SelectProps {
showIcons?: boolean; showIcons?: boolean;
@ -29,13 +28,17 @@ const defaultProps: Partial<Props> = {
@observer @observer
export class NamespaceSelect extends React.Component<Props> { export class NamespaceSelect extends React.Component<Props> {
static defaultProps = defaultProps as object; static defaultProps = defaultProps as object;
private unsubscribe = noop;
componentDidMount() { async componentDidMount() {
disposeOnUnmount(this, [ if (!namespaceStore.isLoaded) {
kubeWatchApi.subscribeStores([namespaceStore], { await namespaceStore.loadAll();
preload: true, }
}) this.unsubscribe = namespaceStore.subscribe();
]); }
componentWillUnmount() {
this.unsubscribe();
} }
@computed get options(): SelectOption[] { @computed get options(): SelectOption[] {
@ -57,7 +60,7 @@ export class NamespaceSelect extends React.Component<Props> {
return label || ( return label || (
<> <>
{showIcons && <Icon small material="layers"/>} {showIcons && <Icon small material="layers" />}
{value} {value}
</> </>
); );
@ -100,9 +103,9 @@ export class NamespaceSelectFilter extends React.Component {
return ( return (
<div className="flex gaps align-center"> <div className="flex gaps align-center">
<FilterIcon type={FilterType.NAMESPACE}/> <FilterIcon type={FilterType.NAMESPACE} />
<span>{namespace}</span> <span>{namespace}</span>
{isSelected && <Icon small material="check" className="box right"/>} {isSelected && <Icon small material="check" className="box right" />}
</div> </div>
); );
}} }}

View File

@ -117,15 +117,15 @@ export class NamespaceStore extends KubeObjectStore<Namespace> {
return namespaces; return namespaces;
} }
getSubscribeApis() { subscribe(apis = [this.api]) {
const { accessibleNamespaces } = getHostedCluster(); const { accessibleNamespaces } = getHostedCluster();
// if user has given static list of namespaces let's not start watches because watch adds stuff that's not wanted // if user has given static list of namespaces let's not start watches because watch adds stuff that's not wanted
if (accessibleNamespaces.length > 0) { if (accessibleNamespaces.length > 0) {
return []; return Function; // no-op
} }
return super.getSubscribeApis(); return super.subscribe(apis);
} }
protected async loadItems(params: KubeObjectStoreLoadingParams) { protected async loadItems(params: KubeObjectStoreLoadingParams) {

View File

@ -1,18 +1,17 @@
import "./service-details.scss"; import "./service-details.scss";
import React from "react"; import React from "react";
import { disposeOnUnmount, observer } from "mobx-react"; import { observer } from "mobx-react";
import { DrawerItem, DrawerTitle } from "../drawer"; import { DrawerItem, DrawerTitle } from "../drawer";
import { Badge } from "../badge"; import { Badge } from "../badge";
import { KubeEventDetails } from "../+events/kube-event-details"; import { KubeEventDetails } from "../+events/kube-event-details";
import { KubeObjectDetailsProps } from "../kube-object"; import { KubeObjectDetailsProps } from "../kube-object";
import { Service } from "../../api/endpoints"; import { Service, endpointApi } from "../../api/endpoints";
import { KubeObjectMeta } from "../kube-object/kube-object-meta"; import { KubeObjectMeta } from "../kube-object/kube-object-meta";
import { ServicePortComponent } from "./service-port-component"; import { ServicePortComponent } from "./service-port-component";
import { endpointStore } from "../+network-endpoints/endpoints.store"; import { endpointStore } from "../+network-endpoints/endpoints.store";
import { ServiceDetailsEndpoint } from "./service-details-endpoint"; import { ServiceDetailsEndpoint } from "./service-details-endpoint";
import { kubeObjectDetailRegistry } from "../../api/kube-object-detail-registry"; import { kubeObjectDetailRegistry } from "../../api/kube-object-detail-registry";
import { kubeWatchApi } from "../../api/kube-watch-api";
interface Props extends KubeObjectDetailsProps<Service> { interface Props extends KubeObjectDetailsProps<Service> {
} }
@ -20,11 +19,10 @@ interface Props extends KubeObjectDetailsProps<Service> {
@observer @observer
export class ServiceDetails extends React.Component<Props> { export class ServiceDetails extends React.Component<Props> {
componentDidMount() { componentDidMount() {
disposeOnUnmount(this, [ if (!endpointStore.isLoaded) {
kubeWatchApi.subscribeStores([endpointStore], { endpointStore.loadAll();
preload: true, }
}), endpointApi.watch();
]);
} }
render() { render() {
@ -79,7 +77,7 @@ export class ServiceDetails extends React.Component<Props> {
)} )}
<DrawerTitle title={`Endpoint`}/> <DrawerTitle title={`Endpoint`}/>
<ServiceDetailsEndpoint endpoint={endpoint}/> <ServiceDetailsEndpoint endpoint={endpoint} />
</div> </div>
); );
} }

View File

@ -1,4 +1,3 @@
import { sum } from "lodash";
import { action, computed, observable } from "mobx"; import { action, computed, observable } from "mobx";
import { clusterApi, IClusterMetrics, INodeMetrics, Node, nodesApi } from "../../api/endpoints"; import { clusterApi, IClusterMetrics, INodeMetrics, Node, nodesApi } from "../../api/endpoints";
import { autobind } from "../../utils"; import { autobind } from "../../utils";
@ -63,10 +62,6 @@ export class NodesStore extends KubeObjectStore<Node> {
}); });
} }
getWarningsCount(): number {
return sum(this.items.map((node: Node) => node.getWarningConditions().length));
}
reset() { reset() {
super.reset(); super.reset();
this.metrics = {}; this.metrics = {};

View File

@ -9,8 +9,8 @@ import { apiManager } from "../../api/api-manager";
export class RoleBindingsStore extends KubeObjectStore<RoleBinding> { export class RoleBindingsStore extends KubeObjectStore<RoleBinding> {
api = clusterRoleBindingApi; api = clusterRoleBindingApi;
getSubscribeApis() { subscribe() {
return [clusterRoleBindingApi, roleBindingApi]; return super.subscribe([clusterRoleBindingApi, roleBindingApi]);
} }
protected sortItems(items: RoleBinding[]) { protected sortItems(items: RoleBinding[]) {

View File

@ -7,8 +7,8 @@ import { apiManager } from "../../api/api-manager";
export class RolesStore extends KubeObjectStore<Role> { export class RolesStore extends KubeObjectStore<Role> {
api = clusterRoleApi; api = clusterRoleApi;
getSubscribeApis() { subscribe() {
return [roleApi, clusterRoleApi]; return super.subscribe([roleApi, clusterRoleApi]);
} }
protected sortItems(items: Role[]) { protected sortItems(items: Role[]) {

View File

@ -1,7 +1,8 @@
import "./overview.scss"; import "./overview.scss";
import React from "react"; import React from "react";
import { disposeOnUnmount, observer } from "mobx-react"; import { observable, when } from "mobx";
import { observer } from "mobx-react";
import { OverviewStatuses } from "./overview-statuses"; import { OverviewStatuses } from "./overview-statuses";
import { RouteComponentProps } from "react-router"; import { RouteComponentProps } from "react-router";
import { IWorkloadsOverviewRouteParams } from "../+workloads"; import { IWorkloadsOverviewRouteParams } from "../+workloads";
@ -14,23 +15,60 @@ import { replicaSetStore } from "../+workloads-replicasets/replicasets.store";
import { jobStore } from "../+workloads-jobs/job.store"; import { jobStore } from "../+workloads-jobs/job.store";
import { cronJobStore } from "../+workloads-cronjobs/cronjob.store"; import { cronJobStore } from "../+workloads-cronjobs/cronjob.store";
import { Events } from "../+events"; import { Events } from "../+events";
import { KubeObjectStore } from "../../kube-object.store";
import { isAllowedResource } from "../../../common/rbac"; import { isAllowedResource } from "../../../common/rbac";
import { kubeWatchApi } from "../../api/kube-watch-api"; import { namespaceStore } from "../+namespaces/namespace.store";
interface Props extends RouteComponentProps<IWorkloadsOverviewRouteParams> { interface Props extends RouteComponentProps<IWorkloadsOverviewRouteParams> {
} }
@observer @observer
export class WorkloadsOverview extends React.Component<Props> { export class WorkloadsOverview extends React.Component<Props> {
componentDidMount() { @observable isLoading = false;
disposeOnUnmount(this, [ @observable isUnmounting = false;
kubeWatchApi.subscribeStores([
podsStore, deploymentStore, daemonSetStore, statefulSetStore, replicaSetStore, async componentDidMount() {
jobStore, cronJobStore, eventStore, const stores: KubeObjectStore[] = [
], { isAllowedResource("pods") && podsStore,
preload: true, isAllowedResource("deployments") && deploymentStore,
}), isAllowedResource("daemonsets") && daemonSetStore,
]); isAllowedResource("statefulsets") && statefulSetStore,
isAllowedResource("replicasets") && replicaSetStore,
isAllowedResource("jobs") && jobStore,
isAllowedResource("cronjobs") && cronJobStore,
isAllowedResource("events") && eventStore,
].filter(Boolean);
const unsubscribeMap = new Map<KubeObjectStore, () => void>();
const loadStores = async () => {
this.isLoading = true;
for (const store of stores) {
if (this.isUnmounting) break;
try {
await store.loadAll();
unsubscribeMap.get(store)?.(); // unsubscribe previous watcher
unsubscribeMap.set(store, store.subscribe());
} catch (error) {
console.error("loading store error", error);
}
}
this.isLoading = false;
};
namespaceStore.onContextChange(loadStores, {
fireImmediately: true,
});
await when(() => this.isUnmounting && !this.isLoading);
unsubscribeMap.forEach(dispose => dispose());
unsubscribeMap.clear();
}
componentWillUnmount() {
this.isUnmounting = true;
} }
render() { render() {

View File

@ -1,5 +1,5 @@
import React from "react"; import React from "react";
import { disposeOnUnmount, observer } from "mobx-react"; import { observer } from "mobx-react";
import { Redirect, Route, Router, Switch } from "react-router"; import { Redirect, Route, Router, Switch } from "react-router";
import { history } from "../navigation"; import { history } from "../navigation";
import { Notifications } from "./notifications"; import { Notifications } from "./notifications";
@ -42,10 +42,10 @@ import { ClusterPageMenuRegistration, clusterPageMenuRegistry } from "../../exte
import { TabLayout, TabLayoutRoute } from "./layout/tab-layout"; import { TabLayout, TabLayoutRoute } from "./layout/tab-layout";
import { StatefulSetScaleDialog } from "./+workloads-statefulsets/statefulset-scale-dialog"; import { StatefulSetScaleDialog } from "./+workloads-statefulsets/statefulset-scale-dialog";
import { eventStore } from "./+events/event.store"; import { eventStore } from "./+events/event.store";
import { computed, reaction, observable } from "mobx"; import { reaction, computed, observable } from "mobx";
import { nodesStore } from "./+nodes/nodes.store"; import { nodesStore } from "./+nodes/nodes.store";
import { podsStore } from "./+workloads-pods/pods.store"; import { podsStore } from "./+workloads-pods/pods.store";
import { kubeWatchApi } from "../api/kube-watch-api"; import { sum } from "lodash";
import { ReplicaSetScaleDialog } from "./+workloads-replicasets/replicaset-scale-dialog"; import { ReplicaSetScaleDialog } from "./+workloads-replicasets/replicaset-scale-dialog";
import { CommandContainer } from "./command-palette/command-container"; import { CommandContainer } from "./command-palette/command-container";
@ -76,26 +76,50 @@ export class App extends React.Component {
whatInput.ask(); // Start to monitor user input device whatInput.ask(); // Start to monitor user input device
} }
componentDidMount() { @observable extensionRoutes: Map<ClusterPageMenuRegistration, React.ReactNode> = new Map();
disposeOnUnmount(this, [
kubeWatchApi.subscribeStores([podsStore, nodesStore, eventStore], {
preload: true,
}),
reaction(() => this.warningsTotal, (count: number) => { async componentDidMount() {
broadcastMessage(`cluster-warning-event-count:${getHostedCluster().id}`, count); const cluster = getHostedCluster();
}), const promises: Promise<void>[] = [];
reaction(() => clusterPageMenuRegistry.getRootItems(), (rootItems) => { if (isAllowedResource("events") && isAllowedResource("pods")) {
this.generateExtensionTabLayoutRoutes(rootItems); promises.push(eventStore.loadAll());
}, { promises.push(podsStore.loadAll());
fireImmediately: true }
})
]); if (isAllowedResource("nodes")) {
promises.push(nodesStore.loadAll());
}
await Promise.all(promises);
if (eventStore.isLoaded && podsStore.isLoaded) {
eventStore.subscribe();
podsStore.subscribe();
}
if (nodesStore.isLoaded) {
nodesStore.subscribe();
}
reaction(() => this.warningsCount, (count) => {
broadcastMessage(`cluster-warning-event-count:${cluster.id}`, count);
});
reaction(() => clusterPageMenuRegistry.getRootItems(), (rootItems) => {
this.generateExtensionTabLayoutRoutes(rootItems);
}, {
fireImmediately: true
});
} }
@computed get warningsTotal(): number { @computed
return nodesStore.getWarningsCount() + eventStore.getWarningsCount(); get warningsCount() {
let warnings = sum(nodesStore.items
.map(node => node.getWarningConditions().length));
warnings = warnings + eventStore.getWarnings().length;
return warnings;
} }
get startURL() { get startURL() {
@ -128,26 +152,6 @@ export class App extends React.Component {
return routes; return routes;
} }
renderExtensionTabLayoutRoutes() {
return clusterPageMenuRegistry.getRootItems().map((menu, index) => {
const tabRoutes = this.getTabLayoutRoutes(menu);
if (tabRoutes.length > 0) {
const pageComponent = () => <TabLayout tabs={tabRoutes}/>;
return <Route key={`extension-tab-layout-route-${index}`} component={pageComponent} path={tabRoutes.map((tab) => tab.routePath)}/>;
} else {
const page = clusterPageRegistry.getByPageTarget(menu.target);
if (page) {
return <Route key={`extension-tab-layout-route-${index}`} path={page.url} component={page.components.Page}/>;
}
}
});
}
@observable extensionRoutes: Map<ClusterPageMenuRegistration, React.ReactNode> = new Map();
generateExtensionTabLayoutRoutes(rootItems: ClusterPageMenuRegistration[]) { generateExtensionTabLayoutRoutes(rootItems: ClusterPageMenuRegistration[]) {
rootItems.forEach((menu, index) => { rootItems.forEach((menu, index) => {
let route = this.extensionRoutes.get(menu); let route = this.extensionRoutes.get(menu);
@ -178,6 +182,10 @@ export class App extends React.Component {
} }
} }
renderExtensionTabLayoutRoutes() {
return Array.from(this.extensionRoutes.values());
}
renderExtensionRoutes() { renderExtensionRoutes() {
return clusterPageRegistry.getItems().map((page, index) => { return clusterPageRegistry.getItems().map((page, index) => {
const menu = clusterPageMenuRegistry.getByPage(page); const menu = clusterPageMenuRegistry.getByPage(page);

View File

@ -2,7 +2,7 @@ import "./item-list-layout.scss";
import groupBy from "lodash/groupBy"; import groupBy from "lodash/groupBy";
import React, { ReactNode } from "react"; import React, { ReactNode } from "react";
import { computed, observable, reaction, toJS } from "mobx"; import { computed, IReactionDisposer, observable, reaction, toJS } from "mobx";
import { disposeOnUnmount, observer } from "mobx-react"; import { disposeOnUnmount, observer } from "mobx-react";
import { ConfirmDialog, ConfirmDialogParams } from "../confirm-dialog"; import { ConfirmDialog, ConfirmDialogParams } from "../confirm-dialog";
import { Table, TableCell, TableCellProps, TableHead, TableProps, TableRow, TableRowProps, TableSortCallback } from "../table"; import { Table, TableCell, TableCellProps, TableHead, TableProps, TableRow, TableRowProps, TableSortCallback } from "../table";
@ -12,6 +12,7 @@ import { NoItems } from "../no-items";
import { Spinner } from "../spinner"; import { Spinner } from "../spinner";
import { ItemObject, ItemStore } from "../../item.store"; import { ItemObject, ItemStore } from "../../item.store";
import { SearchInputUrl } from "../input"; import { SearchInputUrl } from "../input";
import { namespaceStore } from "../+namespaces/namespace.store";
import { Filter, FilterType, pageFilters } from "./page-filters.store"; import { Filter, FilterType, pageFilters } from "./page-filters.store";
import { PageFiltersList } from "./page-filters-list"; import { PageFiltersList } from "./page-filters-list";
import { PageFiltersSelect } from "./page-filters-select"; import { PageFiltersSelect } from "./page-filters-select";
@ -21,7 +22,6 @@ import { MenuActions } from "../menu/menu-actions";
import { MenuItem } from "../menu"; import { MenuItem } from "../menu";
import { Checkbox } from "../checkbox"; import { Checkbox } from "../checkbox";
import { userStore } from "../../../common/user-store"; import { userStore } from "../../../common/user-store";
import { namespaceStore } from "../+namespaces/namespace.store";
// todo: refactor, split to small re-usable components // todo: refactor, split to small re-usable components
@ -40,7 +40,6 @@ export interface ItemListLayoutProps<T extends ItemObject = ItemObject> {
className: IClassName; className: IClassName;
store: ItemStore<T>; store: ItemStore<T>;
dependentStores?: ItemStore[]; dependentStores?: ItemStore[];
preloadStores?: boolean;
isClusterScoped?: boolean; isClusterScoped?: boolean;
hideFilters?: boolean; hideFilters?: boolean;
searchFilters?: SearchFilter<T>[]; searchFilters?: SearchFilter<T>[];
@ -83,7 +82,6 @@ const defaultProps: Partial<ItemListLayoutProps> = {
isSelectable: true, isSelectable: true,
isConfigurable: false, isConfigurable: false,
copyClassNameFromHeadCells: true, copyClassNameFromHeadCells: true,
preloadStores: true,
dependentStores: [], dependentStores: [],
filterItems: [], filterItems: [],
hasDetailsView: true, hasDetailsView: true,
@ -99,6 +97,10 @@ interface ItemListLayoutUserSettings {
export class ItemListLayout extends React.Component<ItemListLayoutProps> { export class ItemListLayout extends React.Component<ItemListLayoutProps> {
static defaultProps = defaultProps as object; static defaultProps = defaultProps as object;
private watchDisposers: IReactionDisposer[] = [];
@observable isUnmounting = false;
@observable userSettings: ItemListLayoutUserSettings = { @observable userSettings: ItemListLayoutUserSettings = {
showAppliedFilters: false, showAppliedFilters: false,
}; };
@ -117,28 +119,54 @@ export class ItemListLayout extends React.Component<ItemListLayoutProps> {
} }
async componentDidMount() { async componentDidMount() {
const { isClusterScoped, isConfigurable, tableId, preloadStores } = this.props; const { isClusterScoped, isConfigurable, tableId } = this.props;
if (isConfigurable && !tableId) { if (isConfigurable && !tableId) {
throw new Error("[ItemListLayout]: configurable list require props.tableId to be specified"); throw new Error("[ItemListLayout]: configurable list require props.tableId to be specified");
} }
if (preloadStores) { this.loadStores();
this.loadStores();
if (!isClusterScoped) { if (!isClusterScoped) {
disposeOnUnmount(this, [ disposeOnUnmount(this, [
namespaceStore.onContextChange(() => this.loadStores()) namespaceStore.onContextChange(() => this.loadStores())
]); ]);
}
}
async componentWillUnmount() {
this.isUnmounting = true;
this.unsubscribeStores();
}
@computed get stores() {
const { store, dependentStores } = this.props;
return new Set([store, ...dependentStores]);
}
async loadStores() {
this.unsubscribeStores(); // reset first
// load
for (const store of this.stores) {
if (this.isUnmounting) {
this.unsubscribeStores();
break;
}
try {
await store.loadAll();
this.watchDisposers.push(store.subscribe());
} catch (error) {
console.error("loading store error", error);
} }
} }
} }
private loadStores() { unsubscribeStores() {
const { store, dependentStores } = this.props; this.watchDisposers.forEach(dispose => dispose());
const stores = Array.from(new Set([store, ...dependentStores])); this.watchDisposers.length = 0;
stores.forEach(store => store.loadAll());
} }
private filterCallbacks: { [type: string]: ItemsFilter } = { private filterCallbacks: { [type: string]: ItemsFilter } = {

View File

@ -1,17 +1,15 @@
import React from "react"; import React from "react";
import { computed } from "mobx"; import { computed } from "mobx";
import { disposeOnUnmount, observer } from "mobx-react"; import { observer } from "mobx-react";
import { cssNames } from "../../utils"; import { cssNames } from "../../utils";
import { KubeObject } from "../../api/kube-object"; import { KubeObject } from "../../api/kube-object";
import { ItemListLayout, ItemListLayoutProps } from "../item-object-list/item-list-layout"; import { ItemListLayout, ItemListLayoutProps } from "../item-object-list/item-list-layout";
import { KubeObjectStore } from "../../kube-object.store"; import { KubeObjectStore } from "../../kube-object.store";
import { KubeObjectMenu } from "./kube-object-menu"; import { KubeObjectMenu } from "./kube-object-menu";
import { kubeSelectedUrlParam, showDetails } from "./kube-object-details"; import { kubeSelectedUrlParam, showDetails } from "./kube-object-details";
import { kubeWatchApi } from "../../api/kube-watch-api";
export interface KubeObjectListLayoutProps extends ItemListLayoutProps { export interface KubeObjectListLayoutProps extends ItemListLayoutProps {
store: KubeObjectStore; store: KubeObjectStore;
dependentStores?: KubeObjectStore[];
} }
@observer @observer
@ -20,17 +18,6 @@ export class KubeObjectListLayout extends React.Component<KubeObjectListLayoutPr
return this.props.store.getByPath(kubeSelectedUrlParam.get()); return this.props.store.getByPath(kubeSelectedUrlParam.get());
} }
componentDidMount() {
const { store, dependentStores = [] } = this.props;
const stores = Array.from(new Set([store, ...dependentStores]));
disposeOnUnmount(this, [
kubeWatchApi.subscribeStores(stores, {
preload: true
})
]);
}
onDetails = (item: KubeObject) => { onDetails = (item: KubeObject) => {
if (this.props.onDetails) { if (this.props.onDetails) {
this.props.onDetails(item); this.props.onDetails(item);
@ -46,7 +33,6 @@ export class KubeObjectListLayout extends React.Component<KubeObjectListLayoutPr
<ItemListLayout <ItemListLayout
{...layoutProps} {...layoutProps}
className={cssNames("KubeObjectListLayout", className)} className={cssNames("KubeObjectListLayout", className)}
preloadStores={false} // loading handled in kubeWatchApi.subscribeStores()
detailsItem={this.selectedItem} detailsItem={this.selectedItem}
onDetails={this.onDetails} onDetails={this.onDetails}
renderItemMenu={(item) => { renderItemMenu={(item) => {

View File

@ -2,10 +2,10 @@ import type { Cluster } from "../main/cluster";
import { action, observable, reaction } from "mobx"; import { action, observable, reaction } from "mobx";
import { autobind } from "./utils"; import { autobind } from "./utils";
import { KubeObject } from "./api/kube-object"; import { KubeObject } from "./api/kube-object";
import { IKubeWatchEvent, IKubeWatchMessage, kubeWatchApi } from "./api/kube-watch-api"; import { IKubeWatchEvent, kubeWatchApi } from "./api/kube-watch-api";
import { ItemStore } from "./item.store"; import { ItemStore } from "./item.store";
import { apiManager } from "./api/api-manager"; import { apiManager } from "./api/api-manager";
import { IKubeApiQueryParams, KubeApi, parseKubeApi } from "./api/kube-api"; import { IKubeApiQueryParams, KubeApi } from "./api/kube-api";
import { KubeJsonApiData } from "./api/kube-json-api"; import { KubeJsonApiData } from "./api/kube-json-api";
export interface KubeObjectStoreLoadingParams { export interface KubeObjectStoreLoadingParams {
@ -22,6 +22,7 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
constructor() { constructor() {
super(); super();
this.bindWatchEventsUpdater(); this.bindWatchEventsUpdater();
kubeWatchApi.addListener(this, this.onWatchApiEvent);
} }
get query(): IKubeApiQueryParams { get query(): IKubeApiQueryParams {
@ -156,7 +157,7 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
@action @action
async loadFromPath(resourcePath: string) { async loadFromPath(resourcePath: string) {
const { namespace, name } = parseKubeApi(resourcePath); const { namespace, name } = KubeApi.parseApi(resourcePath);
return this.load({ name, namespace }); return this.load({ name, namespace });
} }
@ -194,29 +195,29 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
} }
// collect items from watch-api events to avoid UI blowing up with huge streams of data // collect items from watch-api events to avoid UI blowing up with huge streams of data
protected eventsBuffer = observable.array<IKubeWatchEvent<KubeJsonApiData>>([], { deep: false }); protected eventsBuffer = observable<IKubeWatchEvent<KubeJsonApiData>>([], { deep: false });
protected bindWatchEventsUpdater(delay = 1000) { protected bindWatchEventsUpdater(delay = 1000) {
kubeWatchApi.onMessage.addListener(({ store, data }: IKubeWatchMessage<T>) => { return reaction(() => this.eventsBuffer.toJS()[0], this.updateFromEventsBuffer, {
if (!this.isLoaded || store !== this) return;
this.eventsBuffer.push(data);
});
reaction(() => this.eventsBuffer.length > 0, this.updateFromEventsBuffer, {
delay delay
}); });
} }
getSubscribeApis(): KubeApi[] { subscribe(apis = [this.api]) {
return [this.api]; return KubeApi.watchAll(...apis);
} }
subscribe(apis = this.getSubscribeApis()) { protected onWatchApiEvent(evt: IKubeWatchEvent) {
return kubeWatchApi.subscribeApi(apis); if (!this.isLoaded) return;
this.eventsBuffer.push(evt);
} }
@action @action
protected updateFromEventsBuffer() { protected updateFromEventsBuffer() {
if (!this.eventsBuffer.length) {
return;
}
// create latest non-observable copy of items to apply updates in one action (==single render)
const items = this.items.toJS(); const items = this.items.toJS();
for (const { type, object } of this.eventsBuffer.clear()) { for (const { type, object } of this.eventsBuffer.clear()) {