1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

Watch-api streaming reworks (#1990)

* loading k8s resources into stores per selected namespaces -- part 1

Signed-off-by: Roman <ixrock@gmail.com>

* loading k8s resources into stores per selected namespaces -- part 2
- fix: generating helm chart id

Signed-off-by: Roman <ixrock@gmail.com>

* loading k8s resources into stores per selected namespaces -- part 3

Signed-off-by: Roman <ixrock@gmail.com>

* fixes

Signed-off-by: Roman <ixrock@gmail.com>

* fixes / responding to comments

Signed-off-by: Roman <ixrock@gmail.com>

* chore / small fixes

Signed-off-by: Roman <ixrock@gmail.com>

* Watch api does not work for non-admins with lots of namespaces #1898 -- part 1

Signed-off-by: Roman <ixrock@gmail.com>

* fixes & refactoring

Signed-off-by: Roman <ixrock@gmail.com>

* make lint happy

Signed-off-by: Roman <ixrock@gmail.com>

* reset store on loading error

Signed-off-by: Roman <ixrock@gmail.com>

* added new cluster method: cluster.isAllowedResource

Signed-off-by: Roman <ixrock@gmail.com>

* fix: loading namespaces optimizations

Signed-off-by: Roman <ixrock@gmail.com>

* fixes & refactoring

Signed-off-by: Roman <ixrock@gmail.com>

* fix: parse multiple kube-events from stream's chunk

Signed-off-by: Roman <ixrock@gmail.com>

* fix: mobx issue with accessing empty observable array by index (removes warning), use common logger

Signed-off-by: Roman <ixrock@gmail.com>

* fine-tuning

Signed-off-by: Roman <ixrock@gmail.com>

* fix: parse json stream chunks at client-side (might be partial, depends on network speed)

Signed-off-by: Roman <ixrock@gmail.com>

* store subscribing refactoring -- part 1

Signed-off-by: Roman <ixrock@gmail.com>

* store subscribing refactoring -- part 2

Signed-off-by: Roman <ixrock@gmail.com>

* store subscribing refactoring -- part 3

Signed-off-by: Roman <ixrock@gmail.com>

* store subscribing refactoring -- part 4

Signed-off-by: Roman <ixrock@gmail.com>

* auto-reconnect on online/offline status change, interval connection check

Signed-off-by: Roman <ixrock@gmail.com>

* check connection every 5m

Signed-off-by: Roman <ixrock@gmail.com>

* split concurrent watch-api requests by 10 at a time + 150ms delay before next call

Signed-off-by: Roman <ixrock@gmail.com>

* refactoring / clean up

Signed-off-by: Roman <ixrock@gmail.com>

* use `plimit` + delay for k8s watch requests

Signed-off-by: Roman <ixrock@gmail.com>

* lint fix

Signed-off-by: Roman <ixrock@gmail.com>

* added explicit `preload: true` when subscribing stores

Signed-off-by: Roman <ixrock@gmail.com>

* kubeWatchApi refactoring / fine-tuning

Signed-off-by: Roman <ixrock@gmail.com>

* clean up

Signed-off-by: Roman <ixrock@gmail.com>
This commit is contained in:
Roman 2021-02-01 15:49:32 +02:00 committed by GitHub
parent 7490b15aad
commit 078f952b36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 519 additions and 395 deletions

View File

@ -0,0 +1,6 @@
// Create async delay for provided timeout in milliseconds
export async function delay(timeoutMs = 1000) {
if (!timeoutMs) return;
await new Promise(resolve => setTimeout(resolve, timeoutMs));
}

View File

@ -7,6 +7,7 @@ export * from "./autobind";
export * from "./base64";
export * from "./camelCase";
export * from "./cloneJson";
export * from "./delay";
export * from "./debouncePromise";
export * from "./defineGlobal";
export * from "./getRandId";

View File

@ -146,7 +146,7 @@ export class Router {
this.router.add({ method: "get", path: `${apiPrefix}/kubeconfig/service-account/{namespace}/{account}` }, kubeconfigRoute.routeServiceAccountRoute.bind(kubeconfigRoute));
// Watch API
this.router.add({ method: "get", path: `${apiPrefix}/watch` }, watchRoute.routeWatch.bind(watchRoute));
this.router.add({ method: "post", path: `${apiPrefix}/watch` }, watchRoute.routeWatch.bind(watchRoute));
// Metrics API
this.router.add({ method: "post", path: `${apiPrefix}/metrics` }, metricsRoute.routeMetrics.bind(metricsRoute));

View File

@ -1,10 +1,29 @@
import type { KubeJsonApiData, KubeJsonApiError } from "../../renderer/api/kube-json-api";
import plimit from "p-limit";
import { delay } from "../../common/utils";
import { LensApiRequest } from "../router";
import { LensApi } from "../lens-api";
import { Watch, KubeConfig } from "@kubernetes/client-node";
import { KubeConfig, Watch } from "@kubernetes/client-node";
import { ServerResponse } from "http";
import { Request } from "request";
import logger from "../logger";
export interface IKubeWatchEvent<T = KubeJsonApiData | KubeJsonApiError> {
type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR" | "STREAM_END";
object?: T;
}
export interface IKubeWatchEventStreamEnd extends IKubeWatchEvent {
type: "STREAM_END";
url: string;
status: number;
}
export interface IWatchRoutePayload {
apis: string[]; // kube-api url list for subscribing to watch events
}
class ApiWatcher {
private apiUrl: string;
private response: ServerResponse;
@ -24,6 +43,7 @@ class ApiWatcher {
clearInterval(this.processor);
}
this.processor = setInterval(() => {
if (this.response.finished) return;
const events = this.eventBuffer.splice(0);
events.map(event => this.sendEvent(event));
@ -33,7 +53,9 @@ class ApiWatcher {
}
public stop() {
if (!this.watchRequest) { return; }
if (!this.watchRequest) {
return;
}
if (this.processor) {
clearInterval(this.processor);
@ -42,11 +64,14 @@ class ApiWatcher {
try {
this.watchRequest.abort();
this.sendEvent({
const event: IKubeWatchEventStreamEnd = {
type: "STREAM_END",
url: this.apiUrl,
status: 410,
});
};
this.sendEvent(event);
logger.debug("watch aborted");
} catch (error) {
logger.error(`Watch abort errored:${error}`);
@ -65,50 +90,72 @@ class ApiWatcher {
this.watchRequest.abort();
}
private sendEvent(evt: any) {
// convert to "text/event-stream" format
this.response.write(`data: ${JSON.stringify(evt)}\n\n`);
private sendEvent(evt: IKubeWatchEvent) {
this.response.write(`${JSON.stringify(evt)}\n`);
}
}
class WatchRoute extends LensApi {
private response: ServerResponse;
public async routeWatch(request: LensApiRequest) {
const { response, cluster} = request;
const apis: string[] = request.query.getAll("api");
const watchers: ApiWatcher[] = [];
private setResponse(response: ServerResponse) {
// clean up previous connection and stop all corresponding watch-api requests
// otherwise it happens only by request timeout or something else..
this.response?.destroy();
this.response = response;
}
if (!apis.length) {
public async routeWatch(request: LensApiRequest<IWatchRoutePayload>) {
const { response, cluster, payload: { apis } = {} } = request;
if (!apis?.length) {
this.respondJson(response, {
message: "Empty request. Query params 'api' are not provided.",
example: "?api=/api/v1/pods&api=/api/v1/nodes",
message: "watch apis list is empty"
}, 400);
return;
}
response.setHeader("Content-Type", "text/event-stream");
this.setResponse(response);
response.setHeader("Content-Type", "application/json");
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Connection", "keep-alive");
logger.debug(`watch using kubeconfig:${JSON.stringify(cluster.getProxyKubeconfig(), null, 2)}`);
// limit concurrent k8s requests to avoid possible ECONNRESET-error
const requests = plimit(5);
const watchers = new Map<string, ApiWatcher>();
let isWatchRequestEnded = false;
apis.forEach(apiUrl => {
const watcher = new ApiWatcher(apiUrl, cluster.getProxyKubeconfig(), response);
watcher.start();
watchers.push(watcher);
watchers.set(apiUrl, watcher);
requests(async () => {
if (isWatchRequestEnded) return;
await watcher.start();
await delay(100);
});
});
function onRequestEnd() {
if (isWatchRequestEnded) return;
isWatchRequestEnded = true;
requests.clearQueue();
watchers.forEach(watcher => watcher.stop());
watchers.clear();
}
request.raw.req.on("end", () => {
logger.info("Watch request end");
onRequestEnd();
});
request.raw.req.on("close", () => {
logger.debug("Watch request closed");
watchers.map(watcher => watcher.stop());
logger.info("Watch request close");
onRequestEnd();
});
request.raw.req.on("end", () => {
logger.debug("Watch request ended");
watchers.map(watcher => watcher.stop());
});
}
}

View File

@ -2,7 +2,7 @@ import type { KubeObjectStore } from "../kube-object.store";
import { action, observable } from "mobx";
import { autobind } from "../utils";
import { KubeApi } from "./kube-api";
import { KubeApi, parseKubeApi } from "./kube-api";
@autobind()
export class ApiManager {
@ -11,7 +11,7 @@ export class ApiManager {
getApi(pathOrCallback: string | ((api: KubeApi) => boolean)) {
if (typeof pathOrCallback === "string") {
return this.apis.get(pathOrCallback) || this.apis.get(KubeApi.parseApi(pathOrCallback).apiBase);
return this.apis.get(pathOrCallback) || this.apis.get(parseKubeApi(pathOrCallback).apiBase);
}
return Array.from(this.apis.values()).find(pathOrCallback ?? (() => true));

View File

@ -92,14 +92,6 @@ export function ensureObjectSelfLink(api: KubeApi, object: KubeJsonApiData) {
}
export class KubeApi<T extends KubeObject = any> {
static parseApi = parseKubeApi;
static watchAll(...apis: KubeApi[]) {
const disposers = apis.map(api => api.watch());
return () => disposers.forEach(unwatch => unwatch());
}
readonly kind: string;
readonly apiBase: string;
readonly apiPrefix: string;
@ -124,7 +116,7 @@ export class KubeApi<T extends KubeObject = any> {
if (!options.apiBase) {
options.apiBase = objectConstructor.apiBase;
}
const { apiBase, apiPrefix, apiGroup, apiVersion, resource } = KubeApi.parseApi(options.apiBase);
const { apiBase, apiPrefix, apiGroup, apiVersion, resource } = parseKubeApi(options.apiBase);
this.kind = kind;
this.isNamespaced = isNamespaced;
@ -157,7 +149,7 @@ export class KubeApi<T extends KubeObject = any> {
for (const apiUrl of apiBases) {
// Split e.g. "/apis/extensions/v1beta1/ingresses" to parts
const { apiPrefix, apiGroup, apiVersionWithGroup, resource } = KubeApi.parseApi(apiUrl);
const { apiPrefix, apiGroup, apiVersionWithGroup, resource } = parseKubeApi(apiUrl);
// Request available resources
try {
@ -366,7 +358,7 @@ export class KubeApi<T extends KubeObject = any> {
}
watch(): () => void {
return kubeWatchApi.subscribe(this);
return kubeWatchApi.subscribeApi(this);
}
}

View File

@ -1,202 +1,349 @@
// Kubernetes watch-api consumer
// Kubernetes watch-api client
// API: https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams
import { computed, observable, reaction } from "mobx";
import { stringify } from "querystring";
import { autobind, EventEmitter } from "../utils";
import { KubeJsonApiData } from "./kube-json-api";
import type { Cluster } from "../../main/cluster";
import type { IKubeWatchEvent, IKubeWatchEventStreamEnd, IWatchRoutePayload } from "../../main/routes/watch-route";
import type { KubeObject } from "./kube-object";
import type { KubeObjectStore } from "../kube-object.store";
import { ensureObjectSelfLink, KubeApi } from "./kube-api";
import type { NamespaceStore } from "../components/+namespaces/namespace.store";
import plimit from "p-limit";
import debounce from "lodash/debounce";
import { comparer, computed, observable, reaction } from "mobx";
import { autobind, EventEmitter } from "../utils";
import { ensureObjectSelfLink, KubeApi, parseKubeApi } from "./kube-api";
import { KubeJsonApiData, KubeJsonApiError } from "./kube-json-api";
import { apiPrefix, isDebugging, isProduction } from "../../common/vars";
import { apiManager } from "./api-manager";
import { apiPrefix, isDevelopment } from "../../common/vars";
import { getHostedCluster } from "../../common/cluster-store";
export interface IKubeWatchEvent<T = any> {
type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR";
object?: T;
export { IKubeWatchEvent, IKubeWatchEventStreamEnd };
export interface IKubeWatchMessage<T extends KubeObject = any> {
data?: IKubeWatchEvent<KubeJsonApiData>
error?: IKubeWatchEvent<KubeJsonApiError>;
api?: KubeApi<T>;
store?: KubeObjectStore<T>;
}
export interface IKubeWatchRouteEvent {
type: "STREAM_END";
url: string;
status: number;
export interface IKubeWatchSubscribeStoreOptions {
preload?: boolean; // preload store items, default: true
waitUntilLoaded?: boolean; // subscribe only after loading all stores, default: true
cacheLoading?: boolean; // when enabled loading store will be skipped, default: false
}
export interface IKubeWatchRouteQuery {
api: string | string[];
export interface IKubeWatchReconnectOptions {
reconnectAttempts: number;
timeout: number;
}
export interface IKubeWatchLog {
message: string | Error;
meta?: object;
}
@autobind()
export class KubeWatchApi {
protected evtSource: EventSource;
protected onData = new EventEmitter<[IKubeWatchEvent]>();
protected subscribers = observable.map<KubeApi, number>();
protected reconnectTimeoutMs = 5000;
protected maxReconnectsOnError = 10;
protected reconnectAttempts = this.maxReconnectsOnError;
private cluster: Cluster;
private namespaceStore: NamespaceStore;
constructor() {
reaction(() => this.activeApis, () => this.connect(), {
fireImmediately: true,
delay: 500,
});
private requestId = 0;
private isConnected = false;
private reader: ReadableStreamReader<string>;
private subscribers = observable.map<KubeApi, number>();
// events
public onMessage = new EventEmitter<[IKubeWatchMessage]>();
@computed get isActive(): boolean {
return this.apis.length > 0;
}
@computed get activeApis() {
return Array.from(this.subscribers.keys());
@computed get apis(): string[] {
const { cluster, namespaceStore } = this;
const activeApis = Array.from(this.subscribers.keys());
return activeApis.map(api => {
if (!cluster.isAllowedResource(api.kind)) {
return [];
}
if (api.isNamespaced) {
return namespaceStore.getContextNamespaces().map(namespace => api.getWatchUrl(namespace));
} else {
return api.getWatchUrl();
}
}).flat();
}
constructor() {
this.init();
}
private async init() {
const { getHostedCluster } = await import("../../common/cluster-store");
const { namespaceStore } = await import("../components/+namespaces/namespace.store");
await namespaceStore.whenReady;
this.cluster = getHostedCluster();
this.namespaceStore = namespaceStore;
this.bindAutoConnect();
}
private bindAutoConnect() {
const connect = debounce(() => this.connect(), 1000);
reaction(() => this.apis, connect, {
fireImmediately: true,
equals: comparer.structural,
});
window.addEventListener("online", () => this.connect());
window.addEventListener("offline", () => this.disconnect());
setInterval(() => this.connectionCheck(), 60000 * 5); // every 5m
}
getSubscribersCount(api: KubeApi) {
return this.subscribers.get(api) || 0;
}
subscribe(...apis: KubeApi[]) {
isAllowedApi(api: KubeApi): boolean {
return !!this?.cluster.isAllowedResource(api.kind);
}
subscribeApi(api: KubeApi | KubeApi[]): () => void {
const apis: KubeApi[] = [api].flat();
apis.forEach(api => {
if (!this.isAllowedApi(api)) return; // skip
this.subscribers.set(api, this.getSubscribersCount(api) + 1);
});
return () => apis.forEach(api => {
const count = this.getSubscribersCount(api) - 1;
return () => {
apis.forEach(api => {
const count = this.getSubscribersCount(api) - 1;
if (count <= 0) this.subscribers.delete(api);
else this.subscribers.set(api, count);
});
}
// FIXME: use POST to send apis for subscribing (list could be huge)
// TODO: try to use normal fetch res.body stream to consume watch-api updates
// https://github.com/lensapp/lens/issues/1898
protected async getQuery() {
const { namespaceStore } = await import("../components/+namespaces/namespace.store");
await namespaceStore.whenReady;
const { isAdmin } = getHostedCluster();
return {
api: this.activeApis.map(api => {
if (isAdmin && !api.isNamespaced) {
return api.getWatchUrl();
}
if (api.isNamespaced) {
return namespaceStore.getContextNamespaces().map(namespace => api.getWatchUrl(namespace));
}
return [];
}).flat()
if (count <= 0) this.subscribers.delete(api);
else this.subscribers.set(api, count);
});
};
}
// todo: maybe switch to websocket to avoid often reconnects
@autobind()
protected async connect() {
if (this.evtSource) this.disconnect(); // close previous connection
subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): () => void {
const { preload = true, waitUntilLoaded = true, cacheLoading = false } = options;
const limitRequests = plimit(1); // load stores one by one to allow quick skipping when fast clicking btw pages
const preloading: Promise<any>[] = [];
const apis = new Set(stores.map(store => store.getSubscribeApis()).flat());
const unsubscribeList: (() => void)[] = [];
let isUnsubscribed = false;
const query = await this.getQuery();
const subscribe = () => {
if (isUnsubscribed) return;
apis.forEach(api => unsubscribeList.push(this.subscribeApi(api)));
};
if (preload) {
for (const store of stores) {
preloading.push(limitRequests(async () => {
if (cacheLoading && store.isLoaded) return; // skip
return store.loadAll();
}));
}
}
if (waitUntilLoaded) {
Promise.all(preloading).then(subscribe, error => {
this.log({
message: new Error("Loading stores has failed"),
meta: { stores, error, options },
});
});
} else {
subscribe();
}
// unsubscribe
return () => {
if (isUnsubscribed) return;
isUnsubscribed = true;
limitRequests.clearQueue();
unsubscribeList.forEach(unsubscribe => unsubscribe());
};
}
protected async connectionCheck() {
if (!this.isConnected) {
this.log({ message: "Offline: reconnecting.." });
await this.connect();
}
this.log({
message: `Connection check: ${this.isConnected ? "online" : "offline"}`,
meta: { connected: this.isConnected },
});
}
protected async connect(apis = this.apis) {
this.disconnect(); // close active connections first
if (!navigator.onLine || !apis.length) {
this.isConnected = false;
if (!this.activeApis.length || !query.api.length) {
return;
}
const apiUrl = `${apiPrefix}/watch?${stringify(query)}`;
this.log({
message: "Connecting",
meta: { apis }
});
this.evtSource = new EventSource(apiUrl);
this.evtSource.onmessage = this.onMessage;
this.evtSource.onerror = this.onError;
this.writeLog("CONNECTING", query.api);
}
try {
const requestId = ++this.requestId;
const abortController = new AbortController();
reconnect() {
if (!this.evtSource || this.evtSource.readyState !== EventSource.OPEN) {
this.reconnectAttempts = this.maxReconnectsOnError;
this.connect();
const request = await fetch(`${apiPrefix}/watch`, {
method: "POST",
body: JSON.stringify({ apis } as IWatchRoutePayload),
signal: abortController.signal,
headers: {
"content-type": "application/json"
}
});
// request above is stale since new request-id has been issued
if (this.requestId !== requestId) {
abortController.abort();
return;
}
let jsonBuffer = "";
const stream = request.body.pipeThrough(new TextDecoderStream());
const reader = stream.getReader();
this.isConnected = true;
this.reader = reader;
while (true) {
const { done, value } = await reader.read();
if (done) break; // exit
const events = (jsonBuffer + value).split("\n");
jsonBuffer = this.processBuffer(events);
}
} catch (error) {
this.log({ message: error });
} finally {
this.isConnected = false;
}
}
protected disconnect() {
if (!this.evtSource) return;
this.evtSource.close();
this.evtSource.onmessage = null;
this.evtSource = null;
this.reader?.cancel();
this.reader = null;
this.isConnected = false;
}
protected onMessage(evt: MessageEvent) {
if (!evt.data) return;
const data = JSON.parse(evt.data);
// process received stream events, returns unprocessed buffer chunk if any
protected processBuffer(events: string[]): string {
for (const json of events) {
try {
const kubeEvent: IKubeWatchEvent = JSON.parse(json);
const message = this.getMessage(kubeEvent);
if ((data as IKubeWatchEvent).object) {
this.onData.emit(data);
} else {
this.onRouteEvent(data);
this.onMessage.emit(message);
} catch (error) {
return json;
}
}
return "";
}
protected async onRouteEvent(event: IKubeWatchRouteEvent) {
if (event.type === "STREAM_END") {
this.disconnect();
const { apiBase, namespace } = KubeApi.parseApi(event.url);
const api = apiManager.getApi(apiBase);
protected getMessage(event: IKubeWatchEvent): IKubeWatchMessage {
const message: IKubeWatchMessage = {};
if (api) {
try {
await api.refreshResourceVersion({ namespace });
this.reconnect();
} catch (error) {
console.error("failed to refresh resource version", error);
switch (event.type) {
case "ADDED":
case "DELETED":
if (this.subscribers.size > 0) {
setTimeout(() => {
this.onRouteEvent(event);
}, 1000);
}
case "MODIFIED": {
const data = event as IKubeWatchEvent<KubeJsonApiData>;
const api = apiManager.getApiByKind(data.object.kind, data.object.apiVersion);
message.data = data;
if (api) {
ensureObjectSelfLink(api, data.object);
const { namespace, resourceVersion } = data.object.metadata;
api.setResourceVersion(namespace, resourceVersion);
api.setResourceVersion("", resourceVersion);
message.api = api;
message.store = apiManager.getStore(api);
}
break;
}
case "ERROR":
message.error = event as IKubeWatchEvent<KubeJsonApiError>;
break;
case "STREAM_END": {
this.onServerStreamEnd(event as IKubeWatchEventStreamEnd, {
reconnectAttempts: 5,
timeout: 1000,
});
break;
}
}
return message;
}
protected async onServerStreamEnd(event: IKubeWatchEventStreamEnd, opts?: IKubeWatchReconnectOptions) {
const { apiBase, namespace } = parseKubeApi(event.url);
const api = apiManager.getApi(apiBase);
if (!api) return;
try {
await api.refreshResourceVersion({ namespace });
this.connect();
} catch (error) {
this.log({
message: new Error(`Failed to connect on single stream end: ${error}`),
meta: { event, error },
});
if (this.isActive && opts?.reconnectAttempts > 0) {
opts.reconnectAttempts--;
setTimeout(() => this.onServerStreamEnd(event, opts), opts.timeout); // repeat event
}
}
}
protected onError(evt: MessageEvent) {
const { reconnectAttempts: attemptsRemain, reconnectTimeoutMs } = this;
if (evt.eventPhase === EventSource.CLOSED) {
if (attemptsRemain > 0) {
this.reconnectAttempts--;
setTimeout(() => this.connect(), reconnectTimeoutMs);
}
protected log({ message, meta = {} }: IKubeWatchLog) {
if (isProduction && !isDebugging) {
return;
}
}
protected writeLog(...data: any[]) {
if (isDevelopment) {
console.log("%cKUBE-WATCH-API:", `font-weight: bold`, ...data);
const logMessage = `%c[KUBE-WATCH-API]: ${String(message).toUpperCase()}`;
const isError = message instanceof Error;
const textStyle = `font-weight: bold;`;
const time = new Date().toLocaleString();
if (isError) {
console.error(logMessage, textStyle, { time, ...meta });
} else {
console.info(logMessage, textStyle, { time, ...meta });
}
}
addListener(store: KubeObjectStore, callback: (evt: IKubeWatchEvent) => void) {
const listener = (evt: IKubeWatchEvent<KubeJsonApiData>) => {
if (evt.type === "ERROR") {
return; // e.g. evt.object.message == "too old resource version"
}
const { namespace, resourceVersion } = evt.object.metadata;
const api = apiManager.getApiByKind(evt.object.kind, evt.object.apiVersion);
api.setResourceVersion(namespace, resourceVersion);
api.setResourceVersion("", resourceVersion);
ensureObjectSelfLink(api, evt.object);
if (store == apiManager.getStore(api)) {
callback(evt);
}
};
this.onData.addListener(listener);
return () => this.onData.removeListener(listener);
}
reset() {
this.subscribers.clear();
}
}
export const kubeWatchApi = new KubeWatchApi();

View File

@ -3,13 +3,9 @@ import "./cluster-overview.scss";
import React from "react";
import { reaction } from "mobx";
import { disposeOnUnmount, observer } from "mobx-react";
import { eventStore } from "../+events/event.store";
import { nodesStore } from "../+nodes/nodes.store";
import { podsStore } from "../+workloads-pods/pods.store";
import { getHostedCluster } from "../../../common/cluster-store";
import { isAllowedResource } from "../../../common/rbac";
import { KubeObjectStore } from "../../kube-object.store";
import { interval } from "../../utils";
import { TabLayout } from "../layout/tab-layout";
import { Spinner } from "../spinner";
@ -17,45 +13,33 @@ import { ClusterIssues } from "./cluster-issues";
import { ClusterMetrics } from "./cluster-metrics";
import { clusterOverviewStore } from "./cluster-overview.store";
import { ClusterPieCharts } from "./cluster-pie-charts";
import { eventStore } from "../+events/event.store";
import { kubeWatchApi } from "../../api/kube-watch-api";
@observer
export class ClusterOverview extends React.Component {
private stores: KubeObjectStore<any>[] = [];
private subscribers: Array<() => void> = [];
private metricPoller = interval(60, this.loadMetrics);
@disposeOnUnmount
fetchMetrics = reaction(
() => clusterOverviewStore.metricNodeRole, // Toggle Master/Worker node switcher
() => this.metricPoller.restart(true)
);
private metricPoller = interval(60, () => this.loadMetrics());
loadMetrics() {
getHostedCluster().available && clusterOverviewStore.loadMetrics();
}
async componentDidMount() {
if (isAllowedResource("nodes")) {
this.stores.push(nodesStore);
}
componentDidMount() {
this.metricPoller.start(true);
if (isAllowedResource("pods")) {
this.stores.push(podsStore);
}
disposeOnUnmount(this, [
kubeWatchApi.subscribeStores([nodesStore, podsStore, eventStore], {
preload: true,
}),
if (isAllowedResource("events")) {
this.stores.push(eventStore);
}
await Promise.all(this.stores.map(store => store.loadAll()));
this.loadMetrics();
this.subscribers = this.stores.map(store => store.subscribe());
this.metricPoller.start();
reaction(
() => clusterOverviewStore.metricNodeRole, // Toggle Master/Worker node switcher
() => this.metricPoller.restart(true)
),
]);
}
componentWillUnmount() {
this.subscribers.forEach(dispose => dispose()); // unsubscribe all
this.metricPoller.stop();
}

View File

@ -52,6 +52,10 @@ export class EventStore extends KubeObjectStore<KubeEvent> {
return compact(eventsWithError);
}
getWarningsCount() {
return this.getWarnings().length;
}
}
export const eventStore = new EventStore();

View File

@ -2,13 +2,14 @@ import "./namespace-select.scss";
import React from "react";
import { computed } from "mobx";
import { observer } from "mobx-react";
import { disposeOnUnmount, observer } from "mobx-react";
import { Select, SelectOption, SelectProps } from "../select";
import { cssNames, noop } from "../../utils";
import { cssNames } from "../../utils";
import { Icon } from "../icon";
import { namespaceStore } from "./namespace.store";
import { FilterIcon } from "../item-object-list/filter-icon";
import { FilterType } from "../item-object-list/page-filters.store";
import { kubeWatchApi } from "../../api/kube-watch-api";
interface Props extends SelectProps {
showIcons?: boolean;
@ -28,17 +29,13 @@ const defaultProps: Partial<Props> = {
@observer
export class NamespaceSelect extends React.Component<Props> {
static defaultProps = defaultProps as object;
private unsubscribe = noop;
async componentDidMount() {
if (!namespaceStore.isLoaded) {
await namespaceStore.loadAll();
}
this.unsubscribe = namespaceStore.subscribe();
}
componentWillUnmount() {
this.unsubscribe();
componentDidMount() {
disposeOnUnmount(this, [
kubeWatchApi.subscribeStores([namespaceStore], {
preload: true,
})
]);
}
@computed get options(): SelectOption[] {
@ -60,7 +57,7 @@ export class NamespaceSelect extends React.Component<Props> {
return label || (
<>
{showIcons && <Icon small material="layers" />}
{showIcons && <Icon small material="layers"/>}
{value}
</>
);
@ -103,9 +100,9 @@ export class NamespaceSelectFilter extends React.Component {
return (
<div className="flex gaps align-center">
<FilterIcon type={FilterType.NAMESPACE} />
<FilterIcon type={FilterType.NAMESPACE}/>
<span>{namespace}</span>
{isSelected && <Icon small material="check" className="box right" />}
{isSelected && <Icon small material="check" className="box right"/>}
</div>
);
}}

View File

@ -117,15 +117,15 @@ export class NamespaceStore extends KubeObjectStore<Namespace> {
return namespaces;
}
subscribe(apis = [this.api]) {
getSubscribeApis() {
const { accessibleNamespaces } = getHostedCluster();
// if user has given static list of namespaces let's not start watches because watch adds stuff that's not wanted
if (accessibleNamespaces.length > 0) {
return Function; // no-op
return [];
}
return super.subscribe(apis);
return super.getSubscribeApis();
}
protected async loadItems(params: KubeObjectStoreLoadingParams) {

View File

@ -1,17 +1,18 @@
import "./service-details.scss";
import React from "react";
import { observer } from "mobx-react";
import { disposeOnUnmount, observer } from "mobx-react";
import { DrawerItem, DrawerTitle } from "../drawer";
import { Badge } from "../badge";
import { KubeEventDetails } from "../+events/kube-event-details";
import { KubeObjectDetailsProps } from "../kube-object";
import { Service, endpointApi } from "../../api/endpoints";
import { Service } from "../../api/endpoints";
import { KubeObjectMeta } from "../kube-object/kube-object-meta";
import { ServicePortComponent } from "./service-port-component";
import { endpointStore } from "../+network-endpoints/endpoints.store";
import { ServiceDetailsEndpoint } from "./service-details-endpoint";
import { kubeObjectDetailRegistry } from "../../api/kube-object-detail-registry";
import { kubeWatchApi } from "../../api/kube-watch-api";
interface Props extends KubeObjectDetailsProps<Service> {
}
@ -19,10 +20,11 @@ interface Props extends KubeObjectDetailsProps<Service> {
@observer
export class ServiceDetails extends React.Component<Props> {
componentDidMount() {
if (!endpointStore.isLoaded) {
endpointStore.loadAll();
}
endpointApi.watch();
disposeOnUnmount(this, [
kubeWatchApi.subscribeStores([endpointStore], {
preload: true,
}),
]);
}
render() {
@ -77,7 +79,7 @@ export class ServiceDetails extends React.Component<Props> {
)}
<DrawerTitle title={`Endpoint`}/>
<ServiceDetailsEndpoint endpoint={endpoint} />
<ServiceDetailsEndpoint endpoint={endpoint}/>
</div>
);
}

View File

@ -1,3 +1,4 @@
import { sum } from "lodash";
import { action, computed, observable } from "mobx";
import { clusterApi, IClusterMetrics, INodeMetrics, Node, nodesApi } from "../../api/endpoints";
import { autobind } from "../../utils";
@ -62,6 +63,10 @@ export class NodesStore extends KubeObjectStore<Node> {
});
}
getWarningsCount(): number {
return sum(this.items.map((node: Node) => node.getWarningConditions().length));
}
reset() {
super.reset();
this.metrics = {};

View File

@ -9,8 +9,8 @@ import { apiManager } from "../../api/api-manager";
export class RoleBindingsStore extends KubeObjectStore<RoleBinding> {
api = clusterRoleBindingApi;
subscribe() {
return super.subscribe([clusterRoleBindingApi, roleBindingApi]);
getSubscribeApis() {
return [clusterRoleBindingApi, roleBindingApi];
}
protected sortItems(items: RoleBinding[]) {

View File

@ -7,8 +7,8 @@ import { apiManager } from "../../api/api-manager";
export class RolesStore extends KubeObjectStore<Role> {
api = clusterRoleApi;
subscribe() {
return super.subscribe([roleApi, clusterRoleApi]);
getSubscribeApis() {
return [roleApi, clusterRoleApi];
}
protected sortItems(items: Role[]) {

View File

@ -1,8 +1,7 @@
import "./overview.scss";
import React from "react";
import { observable, when } from "mobx";
import { observer } from "mobx-react";
import { disposeOnUnmount, observer } from "mobx-react";
import { OverviewStatuses } from "./overview-statuses";
import { RouteComponentProps } from "react-router";
import { IWorkloadsOverviewRouteParams } from "../+workloads";
@ -15,60 +14,23 @@ import { replicaSetStore } from "../+workloads-replicasets/replicasets.store";
import { jobStore } from "../+workloads-jobs/job.store";
import { cronJobStore } from "../+workloads-cronjobs/cronjob.store";
import { Events } from "../+events";
import { KubeObjectStore } from "../../kube-object.store";
import { isAllowedResource } from "../../../common/rbac";
import { namespaceStore } from "../+namespaces/namespace.store";
import { kubeWatchApi } from "../../api/kube-watch-api";
interface Props extends RouteComponentProps<IWorkloadsOverviewRouteParams> {
}
@observer
export class WorkloadsOverview extends React.Component<Props> {
@observable isLoading = false;
@observable isUnmounting = false;
async componentDidMount() {
const stores: KubeObjectStore[] = [
isAllowedResource("pods") && podsStore,
isAllowedResource("deployments") && deploymentStore,
isAllowedResource("daemonsets") && daemonSetStore,
isAllowedResource("statefulsets") && statefulSetStore,
isAllowedResource("replicasets") && replicaSetStore,
isAllowedResource("jobs") && jobStore,
isAllowedResource("cronjobs") && cronJobStore,
isAllowedResource("events") && eventStore,
].filter(Boolean);
const unsubscribeMap = new Map<KubeObjectStore, () => void>();
const loadStores = async () => {
this.isLoading = true;
for (const store of stores) {
if (this.isUnmounting) break;
try {
await store.loadAll();
unsubscribeMap.get(store)?.(); // unsubscribe previous watcher
unsubscribeMap.set(store, store.subscribe());
} catch (error) {
console.error("loading store error", error);
}
}
this.isLoading = false;
};
namespaceStore.onContextChange(loadStores, {
fireImmediately: true,
});
await when(() => this.isUnmounting && !this.isLoading);
unsubscribeMap.forEach(dispose => dispose());
unsubscribeMap.clear();
}
componentWillUnmount() {
this.isUnmounting = true;
componentDidMount() {
disposeOnUnmount(this, [
kubeWatchApi.subscribeStores([
podsStore, deploymentStore, daemonSetStore, statefulSetStore, replicaSetStore,
jobStore, cronJobStore, eventStore,
], {
preload: true,
}),
]);
}
render() {

View File

@ -1,5 +1,5 @@
import React from "react";
import { observer } from "mobx-react";
import { disposeOnUnmount, observer } from "mobx-react";
import { Redirect, Route, Router, Switch } from "react-router";
import { history } from "../navigation";
import { Notifications } from "./notifications";
@ -42,10 +42,10 @@ import { ClusterPageMenuRegistration, clusterPageMenuRegistry } from "../../exte
import { TabLayout, TabLayoutRoute } from "./layout/tab-layout";
import { StatefulSetScaleDialog } from "./+workloads-statefulsets/statefulset-scale-dialog";
import { eventStore } from "./+events/event.store";
import { reaction, computed, observable } from "mobx";
import { computed, reaction, observable } from "mobx";
import { nodesStore } from "./+nodes/nodes.store";
import { podsStore } from "./+workloads-pods/pods.store";
import { sum } from "lodash";
import { kubeWatchApi } from "../api/kube-watch-api";
import { ReplicaSetScaleDialog } from "./+workloads-replicasets/replicaset-scale-dialog";
@observer
@ -75,50 +75,26 @@ export class App extends React.Component {
whatInput.ask(); // Start to monitor user input device
}
@observable extensionRoutes: Map<ClusterPageMenuRegistration, React.ReactNode> = new Map();
componentDidMount() {
disposeOnUnmount(this, [
kubeWatchApi.subscribeStores([podsStore, nodesStore, eventStore], {
preload: true,
}),
async componentDidMount() {
const cluster = getHostedCluster();
const promises: Promise<void>[] = [];
reaction(() => this.warningsTotal, (count: number) => {
broadcastMessage(`cluster-warning-event-count:${getHostedCluster().id}`, count);
}),
if (isAllowedResource("events") && isAllowedResource("pods")) {
promises.push(eventStore.loadAll());
promises.push(podsStore.loadAll());
}
if (isAllowedResource("nodes")) {
promises.push(nodesStore.loadAll());
}
await Promise.all(promises);
if (eventStore.isLoaded && podsStore.isLoaded) {
eventStore.subscribe();
podsStore.subscribe();
}
if (nodesStore.isLoaded) {
nodesStore.subscribe();
}
reaction(() => this.warningsCount, (count) => {
broadcastMessage(`cluster-warning-event-count:${cluster.id}`, count);
});
reaction(() => clusterPageMenuRegistry.getRootItems(), (rootItems) => {
this.generateExtensionTabLayoutRoutes(rootItems);
}, {
fireImmediately: true
});
reaction(() => clusterPageMenuRegistry.getRootItems(), (rootItems) => {
this.generateExtensionTabLayoutRoutes(rootItems);
}, {
fireImmediately: true
})
]);
}
@computed
get warningsCount() {
let warnings = sum(nodesStore.items
.map(node => node.getWarningConditions().length));
warnings = warnings + eventStore.getWarnings().length;
return warnings;
@computed get warningsTotal(): number {
return nodesStore.getWarningsCount() + eventStore.getWarningsCount();
}
get startURL() {
@ -151,6 +127,26 @@ export class App extends React.Component {
return routes;
}
renderExtensionTabLayoutRoutes() {
return clusterPageMenuRegistry.getRootItems().map((menu, index) => {
const tabRoutes = this.getTabLayoutRoutes(menu);
if (tabRoutes.length > 0) {
const pageComponent = () => <TabLayout tabs={tabRoutes}/>;
return <Route key={`extension-tab-layout-route-${index}`} component={pageComponent} path={tabRoutes.map((tab) => tab.routePath)}/>;
} else {
const page = clusterPageRegistry.getByPageTarget(menu.target);
if (page) {
return <Route key={`extension-tab-layout-route-${index}`} path={page.url} component={page.components.Page}/>;
}
}
});
}
@observable extensionRoutes: Map<ClusterPageMenuRegistration, React.ReactNode> = new Map();
generateExtensionTabLayoutRoutes(rootItems: ClusterPageMenuRegistration[]) {
rootItems.forEach((menu, index) => {
let route = this.extensionRoutes.get(menu);
@ -181,10 +177,6 @@ export class App extends React.Component {
}
}
renderExtensionTabLayoutRoutes() {
return Array.from(this.extensionRoutes.values());
}
renderExtensionRoutes() {
return clusterPageRegistry.getItems().map((page, index) => {
const menu = clusterPageMenuRegistry.getByPage(page);

View File

@ -2,7 +2,7 @@ import "./item-list-layout.scss";
import groupBy from "lodash/groupBy";
import React, { ReactNode } from "react";
import { computed, IReactionDisposer, observable, reaction, toJS } from "mobx";
import { computed, observable, reaction, toJS } from "mobx";
import { disposeOnUnmount, observer } from "mobx-react";
import { ConfirmDialog, ConfirmDialogParams } from "../confirm-dialog";
import { Table, TableCell, TableCellProps, TableHead, TableProps, TableRow, TableRowProps, TableSortCallback } from "../table";
@ -12,7 +12,6 @@ import { NoItems } from "../no-items";
import { Spinner } from "../spinner";
import { ItemObject, ItemStore } from "../../item.store";
import { SearchInputUrl } from "../input";
import { namespaceStore } from "../+namespaces/namespace.store";
import { Filter, FilterType, pageFilters } from "./page-filters.store";
import { PageFiltersList } from "./page-filters-list";
import { PageFiltersSelect } from "./page-filters-select";
@ -22,6 +21,7 @@ import { MenuActions } from "../menu/menu-actions";
import { MenuItem } from "../menu";
import { Checkbox } from "../checkbox";
import { userStore } from "../../../common/user-store";
import { namespaceStore } from "../+namespaces/namespace.store";
// todo: refactor, split to small re-usable components
@ -40,6 +40,7 @@ export interface ItemListLayoutProps<T extends ItemObject = ItemObject> {
className: IClassName;
store: ItemStore<T>;
dependentStores?: ItemStore[];
preloadStores?: boolean;
isClusterScoped?: boolean;
hideFilters?: boolean;
searchFilters?: SearchFilter<T>[];
@ -82,6 +83,7 @@ const defaultProps: Partial<ItemListLayoutProps> = {
isSelectable: true,
isConfigurable: false,
copyClassNameFromHeadCells: true,
preloadStores: true,
dependentStores: [],
filterItems: [],
hasDetailsView: true,
@ -97,10 +99,6 @@ interface ItemListLayoutUserSettings {
export class ItemListLayout extends React.Component<ItemListLayoutProps> {
static defaultProps = defaultProps as object;
private watchDisposers: IReactionDisposer[] = [];
@observable isUnmounting = false;
@observable userSettings: ItemListLayoutUserSettings = {
showAppliedFilters: false,
};
@ -119,54 +117,28 @@ export class ItemListLayout extends React.Component<ItemListLayoutProps> {
}
async componentDidMount() {
const { isClusterScoped, isConfigurable, tableId } = this.props;
const { isClusterScoped, isConfigurable, tableId, preloadStores } = this.props;
if (isConfigurable && !tableId) {
throw new Error("[ItemListLayout]: configurable list require props.tableId to be specified");
}
this.loadStores();
if (preloadStores) {
this.loadStores();
if (!isClusterScoped) {
disposeOnUnmount(this, [
namespaceStore.onContextChange(() => this.loadStores())
]);
if (!isClusterScoped) {
disposeOnUnmount(this, [
namespaceStore.onContextChange(() => this.loadStores())
]);
}
}
}
async componentWillUnmount() {
this.isUnmounting = true;
this.unsubscribeStores();
}
@computed get stores() {
private loadStores() {
const { store, dependentStores } = this.props;
const stores = Array.from(new Set([store, ...dependentStores]));
return new Set([store, ...dependentStores]);
}
async loadStores() {
this.unsubscribeStores(); // reset first
// load
for (const store of this.stores) {
if (this.isUnmounting) {
this.unsubscribeStores();
break;
}
try {
await store.loadAll();
this.watchDisposers.push(store.subscribe());
} catch (error) {
console.error("loading store error", error);
}
}
}
unsubscribeStores() {
this.watchDisposers.forEach(dispose => dispose());
this.watchDisposers.length = 0;
stores.forEach(store => store.loadAll());
}
private filterCallbacks: { [type: string]: ItemsFilter } = {

View File

@ -1,15 +1,17 @@
import React from "react";
import { computed } from "mobx";
import { observer } from "mobx-react";
import { disposeOnUnmount, observer } from "mobx-react";
import { cssNames } from "../../utils";
import { KubeObject } from "../../api/kube-object";
import { ItemListLayout, ItemListLayoutProps } from "../item-object-list/item-list-layout";
import { KubeObjectStore } from "../../kube-object.store";
import { KubeObjectMenu } from "./kube-object-menu";
import { kubeSelectedUrlParam, showDetails } from "./kube-object-details";
import { kubeWatchApi } from "../../api/kube-watch-api";
export interface KubeObjectListLayoutProps extends ItemListLayoutProps {
store: KubeObjectStore;
dependentStores?: KubeObjectStore[];
}
@observer
@ -18,6 +20,17 @@ export class KubeObjectListLayout extends React.Component<KubeObjectListLayoutPr
return this.props.store.getByPath(kubeSelectedUrlParam.get());
}
componentDidMount() {
const { store, dependentStores = [] } = this.props;
const stores = Array.from(new Set([store, ...dependentStores]));
disposeOnUnmount(this, [
kubeWatchApi.subscribeStores(stores, {
preload: true
})
]);
}
onDetails = (item: KubeObject) => {
if (this.props.onDetails) {
this.props.onDetails(item);
@ -33,6 +46,7 @@ export class KubeObjectListLayout extends React.Component<KubeObjectListLayoutPr
<ItemListLayout
{...layoutProps}
className={cssNames("KubeObjectListLayout", className)}
preloadStores={false} // loading handled in kubeWatchApi.subscribeStores()
detailsItem={this.selectedItem}
onDetails={this.onDetails}
renderItemMenu={(item) => {

View File

@ -2,10 +2,10 @@ import type { Cluster } from "../main/cluster";
import { action, observable, reaction } from "mobx";
import { autobind } from "./utils";
import { KubeObject } from "./api/kube-object";
import { IKubeWatchEvent, kubeWatchApi } from "./api/kube-watch-api";
import { IKubeWatchEvent, IKubeWatchMessage, kubeWatchApi } from "./api/kube-watch-api";
import { ItemStore } from "./item.store";
import { apiManager } from "./api/api-manager";
import { IKubeApiQueryParams, KubeApi } from "./api/kube-api";
import { IKubeApiQueryParams, KubeApi, parseKubeApi } from "./api/kube-api";
import { KubeJsonApiData } from "./api/kube-json-api";
export interface KubeObjectStoreLoadingParams {
@ -22,7 +22,6 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
constructor() {
super();
this.bindWatchEventsUpdater();
kubeWatchApi.addListener(this, this.onWatchApiEvent);
}
get query(): IKubeApiQueryParams {
@ -157,7 +156,7 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
@action
async loadFromPath(resourcePath: string) {
const { namespace, name } = KubeApi.parseApi(resourcePath);
const { namespace, name } = parseKubeApi(resourcePath);
return this.load({ name, namespace });
}
@ -195,29 +194,29 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
}
// collect items from watch-api events to avoid UI blowing up with huge streams of data
protected eventsBuffer = observable<IKubeWatchEvent<KubeJsonApiData>>([], { deep: false });
protected eventsBuffer = observable.array<IKubeWatchEvent<KubeJsonApiData>>([], { deep: false });
protected bindWatchEventsUpdater(delay = 1000) {
return reaction(() => this.eventsBuffer.toJS()[0], this.updateFromEventsBuffer, {
kubeWatchApi.onMessage.addListener(({ store, data }: IKubeWatchMessage<T>) => {
if (!this.isLoaded || store !== this) return;
this.eventsBuffer.push(data);
});
reaction(() => this.eventsBuffer.length > 0, this.updateFromEventsBuffer, {
delay
});
}
subscribe(apis = [this.api]) {
return KubeApi.watchAll(...apis);
getSubscribeApis(): KubeApi[] {
return [this.api];
}
protected onWatchApiEvent(evt: IKubeWatchEvent) {
if (!this.isLoaded) return;
this.eventsBuffer.push(evt);
subscribe(apis = this.getSubscribeApis()) {
return kubeWatchApi.subscribeApi(apis);
}
@action
protected updateFromEventsBuffer() {
if (!this.eventsBuffer.length) {
return;
}
// create latest non-observable copy of items to apply updates in one action (==single render)
const items = this.items.toJS();
for (const { type, object } of this.eventsBuffer.clear()) {