1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

use native k8s api watches

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>
This commit is contained in:
Jari Kolehmainen 2021-02-08 16:51:02 +02:00
parent 0ef0868690
commit 701259695c
13 changed files with 183 additions and 471 deletions

View File

@ -189,6 +189,7 @@
"@kubernetes/client-node": "^0.12.0",
"array-move": "^3.0.0",
"await-lock": "^2.1.0",
"byline": "^5.0.0",
"chalk": "^4.1.0",
"chokidar": "^3.4.3",
"command-exists": "1.2.9",
@ -221,6 +222,7 @@
"react": "^17.0.1",
"react-dom": "^17.0.1",
"react-router": "^5.2.0",
"readable-web-to-node-stream": "^3.0.1",
"request": "^2.88.2",
"request-promise-native": "^1.0.8",
"semver": "^7.3.2",
@ -242,6 +244,7 @@
"@pmmmwh/react-refresh-webpack-plugin": "^0.4.3",
"@testing-library/jest-dom": "^5.11.5",
"@testing-library/react": "^11.1.0",
"@types/byline": "^4.2.32",
"@types/chart.js": "^2.9.21",
"@types/circular-dependency-plugin": "^5.0.1",
"@types/color": "^3.0.1",

View File

@ -5,7 +5,7 @@ import path from "path";
import { readFile } from "fs-extra";
import { Cluster } from "./cluster";
import { apiPrefix, appName, publicPath, isDevelopment, webpackDevServerPort } from "../common/vars";
import { helmRoute, kubeconfigRoute, metricsRoute, portForwardRoute, resourceApplierRoute, watchRoute, versionRoute } from "./routes";
import { helmRoute, kubeconfigRoute, metricsRoute, portForwardRoute, resourceApplierRoute, versionRoute } from "./routes";
import logger from "./logger";
export interface RouterRequestOpts {
@ -146,9 +146,6 @@ export class Router {
this.router.add({ method: "get", path: "/version"}, versionRoute.getVersion.bind(versionRoute));
this.router.add({ method: "get", path: `${apiPrefix}/kubeconfig/service-account/{namespace}/{account}` }, kubeconfigRoute.routeServiceAccountRoute.bind(kubeconfigRoute));
// Watch API
this.router.add({ method: "post", path: `${apiPrefix}/watch` }, watchRoute.routeWatch.bind(watchRoute));
// Metrics API
this.router.add({ method: "post", path: `${apiPrefix}/metrics` }, metricsRoute.routeMetrics.bind(metricsRoute));

View File

@ -1,7 +1,6 @@
export * from "./kubeconfig-route";
export * from "./metrics-route";
export * from "./port-forward-route";
export * from "./watch-route";
export * from "./helm-route";
export * from "./resource-applier-route";
export * from "./version-route";

View File

@ -1,162 +0,0 @@
import type { KubeJsonApiData, KubeJsonApiError } from "../../renderer/api/kube-json-api";
import plimit from "p-limit";
import { delay } from "../../common/utils";
import { LensApiRequest } from "../router";
import { LensApi } from "../lens-api";
import { KubeConfig, Watch } from "@kubernetes/client-node";
import { ServerResponse } from "http";
import { Request } from "request";
import logger from "../logger";
export interface IKubeWatchEvent<T = KubeJsonApiData | KubeJsonApiError> {
type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR" | "STREAM_END";
object?: T;
}
export interface IKubeWatchEventStreamEnd extends IKubeWatchEvent {
type: "STREAM_END";
url: string;
status: number;
}
export interface IWatchRoutePayload {
apis: string[]; // kube-api url list for subscribing to watch events
}
class ApiWatcher {
private apiUrl: string;
private response: ServerResponse;
private watchRequest: Request;
private watch: Watch;
private processor: NodeJS.Timeout;
private eventBuffer: any[] = [];
constructor(apiUrl: string, kubeConfig: KubeConfig, response: ServerResponse) {
this.apiUrl = apiUrl;
this.watch = new Watch(kubeConfig);
this.response = response;
}
public async start() {
if (this.processor) {
clearInterval(this.processor);
}
this.processor = setInterval(() => {
if (this.response.finished) return;
const events = this.eventBuffer.splice(0);
events.map(event => this.sendEvent(event));
this.response.flushHeaders();
}, 50);
this.watchRequest = await this.watch.watch(this.apiUrl, {}, this.watchHandler.bind(this), this.doneHandler.bind(this));
}
public stop() {
if (!this.watchRequest) {
return;
}
if (this.processor) {
clearInterval(this.processor);
}
logger.debug(`Stopping watcher for api: ${this.apiUrl}`);
try {
this.watchRequest.abort();
const event: IKubeWatchEventStreamEnd = {
type: "STREAM_END",
url: this.apiUrl,
status: 410,
};
this.sendEvent(event);
logger.debug("watch aborted");
} catch (error) {
logger.error(`Watch abort errored:${error}`);
}
}
private watchHandler(phase: string, obj: any) {
this.eventBuffer.push({
type: phase,
object: obj
});
}
private doneHandler(error: Error) {
if (error) logger.warn(`watch ended: ${error.toString()}`);
this.watchRequest.abort();
}
private sendEvent(evt: IKubeWatchEvent) {
this.response.write(`${JSON.stringify(evt)}\n`);
}
}
class WatchRoute extends LensApi {
private response: ServerResponse;
private setResponse(response: ServerResponse) {
// clean up previous connection and stop all corresponding watch-api requests
// otherwise it happens only by request timeout or something else..
this.response?.destroy();
this.response = response;
}
public async routeWatch(request: LensApiRequest<IWatchRoutePayload>) {
const { response, cluster, payload: { apis } = {} } = request;
if (!apis?.length) {
this.respondJson(response, {
message: "watch apis list is empty"
}, 400);
return;
}
this.setResponse(response);
response.setHeader("Content-Type", "application/json");
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Connection", "keep-alive");
logger.debug(`watch using kubeconfig:${JSON.stringify(cluster.getProxyKubeconfig(), null, 2)}`);
// limit concurrent k8s requests to avoid possible ECONNRESET-error
const requests = plimit(5);
const watchers = new Map<string, ApiWatcher>();
let isWatchRequestEnded = false;
apis.forEach(apiUrl => {
const watcher = new ApiWatcher(apiUrl, cluster.getProxyKubeconfig(), response);
watchers.set(apiUrl, watcher);
requests(async () => {
if (isWatchRequestEnded) return;
await watcher.start();
await delay(100);
});
});
function onRequestEnd() {
if (isWatchRequestEnded) return;
isWatchRequestEnded = true;
requests.clearQueue();
watchers.forEach(watcher => watcher.stop());
watchers.clear();
}
request.raw.req.on("end", () => {
logger.info("Watch request end");
onRequestEnd();
});
request.raw.req.on("close", () => {
logger.info("Watch request close");
onRequestEnd();
});
}
}
export const watchRoute = new WatchRoute();

View File

@ -55,6 +55,20 @@ export class JsonApi<D = JsonApiData, P extends JsonApiParams = JsonApiParams> {
return this.request<T>(path, params, { ...reqInit, method: "get" });
}
getReadableStream(path: string, params?: P, init: RequestInit = {}): Promise<Response> {
let reqUrl = this.config.apiBase + path;
const reqInit: RequestInit = { ...init };
const { query } = params || {} as P;
if (query) {
const queryString = stringify(query);
reqUrl += (reqUrl.includes("?") ? "&" : "?") + queryString;
}
return fetch(reqUrl, reqInit);
}
post<T = D>(path: string, params?: P, reqInit: RequestInit = {}) {
return this.request<T>(path, params, { ...reqInit, method: "post" });
}

View File

@ -9,7 +9,9 @@ import { apiKube } from "./index";
import { createKubeApiURL, parseKubeApi } from "./kube-api-parse";
import { KubeJsonApi, KubeJsonApiData, KubeJsonApiDataList } from "./kube-json-api";
import { IKubeObjectConstructor, KubeObject } from "./kube-object";
import { kubeWatchApi } from "./kube-watch-api";
import byline from "byline";
import { ReadableWebToNodeStream } from "readable-web-to-node-stream";
import { IKubeWatchEvent, IKubeWatchMessage } from "./kube-watch-api";
export interface IKubeApiOptions<T extends KubeObject> {
/**
@ -91,6 +93,11 @@ export function ensureObjectSelfLink(api: KubeApi, object: KubeJsonApiData) {
}
}
type KubeApiWatchOptions = {
namespace: string;
callback?: (data: IKubeWatchEvent) => void;
};
export class KubeApi<T extends KubeObject = any> {
readonly kind: string;
readonly apiBase: string;
@ -104,6 +111,7 @@ export class KubeApi<T extends KubeObject = any> {
public objectConstructor: IKubeObjectConstructor<T>;
protected request: KubeJsonApi;
protected resourceVersions = new Map<string, string>();
protected watchDisposer: () => void;
constructor(protected options: IKubeApiOptions<T>) {
const {
@ -357,8 +365,82 @@ export class KubeApi<T extends KubeObject = any> {
});
}
watch(): () => void {
return kubeWatchApi.subscribeApi(this);
watch(opts: KubeApiWatchOptions = { namespace: "" }): () => void {
const { namespace, callback } = opts;
const watchUrl = this.getWatchUrl(namespace);
const abortController = new AbortController();
const responsePromise = this.request.getReadableStream(watchUrl, null, { signal: abortController.signal });
let disposed = false;
responsePromise.then((response) => {
const nodeStream = new ReadableWebToNodeStream(response.body);
const stream = byline(nodeStream);
stream.on("data", (line) => {
try {
const data: IKubeWatchEvent = JSON.parse(line);
console.log("data", data);
if (callback) {
callback(data);
}
} catch (ignore) {
// ignore parse errors
}
});
stream.on("close", () => {
setTimeout(() => {
if (!disposed) this.watch({namespace, callback});
}, 1000);
});
stream.on("error", (error) => {
console.error("stream error", error);
});
}, (error) => {
if (error instanceof DOMException) return; // AbortController rejects, we can ignore it
console.error("watch rejected", error);
}).catch((error) => {
console.error("watch error", error);
});
const disposer = () => {
disposed = true;
abortController.abort();
};
return disposer;
}
protected generateMessage(event: IKubeWatchEvent): IKubeWatchMessage {
const message: IKubeWatchMessage = {};
switch (event.type) {
case "ADDED":
case "DELETED":
case "MODIFIED": {
const data = event as IKubeWatchEvent<KubeJsonApiData>;
message.data = data;
ensureObjectSelfLink(this, data.object);
const { namespace, resourceVersion } = data.object.metadata;
this.setResourceVersion(namespace, resourceVersion);
this.setResourceVersion("", resourceVersion);
message.api = this;
message.namespace = namespace;
break;
}
}
return message;
}
}

View File

@ -1,21 +1,21 @@
// Kubernetes watch-api client
// API: https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams
import type { IKubeWatchEvent, IKubeWatchEventStreamEnd, IWatchRoutePayload } from "../../main/routes/watch-route";
import type { KubeObject } from "./kube-object";
import type { KubeObjectStore } from "../kube-object.store";
import type { ClusterContext } from "../components/context";
import plimit from "p-limit";
import debounce from "lodash/debounce";
import { comparer, computed, IReactionDisposer, observable, reaction, when } from "mobx";
import { autobind, EventEmitter, noop } from "../utils";
import { ensureObjectSelfLink, KubeApi, parseKubeApi } from "./kube-api";
import { comparer, IReactionDisposer, observable, reaction, when } from "mobx";
import { autobind, noop } from "../utils";
import { KubeApi } from "./kube-api";
import { KubeJsonApiData, KubeJsonApiError } from "./kube-json-api";
import { apiPrefix, isDebugging, isProduction } from "../../common/vars";
import { apiManager } from "./api-manager";
import { isDebugging, isProduction } from "../../common/vars";
export { IKubeWatchEvent, IKubeWatchEventStreamEnd };
export interface IKubeWatchEvent<T = KubeJsonApiData> {
type: "ADDED" | "MODIFIED" | "DELETED";
object?: T;
}
export interface IKubeWatchMessage<T extends KubeObject = any> {
namespace?: string;
@ -32,11 +32,6 @@ export interface IKubeWatchSubscribeStoreOptions {
loadOnce?: boolean; // check store.isLoaded to skip loading if done already, default: false
}
export interface IKubeWatchReconnectOptions {
reconnectAttempts: number;
timeout: number;
}
export interface IKubeWatchLog {
message: string | string[] | Error;
meta?: object;
@ -45,87 +40,24 @@ export interface IKubeWatchLog {
@autobind()
export class KubeWatchApi {
private requestId = 0;
private reader: ReadableStreamReader<string>;
public onMessage = new EventEmitter<[IKubeWatchMessage]>();
@observable context: ClusterContext = null;
@observable subscribers = observable.map<KubeApi, number>();
@observable isConnected = false;
contextReady = when(() => Boolean(this.context));
@computed get isActive(): boolean {
return this.apis.length > 0;
}
@computed get apis(): string[] {
if (!this.context) {
return [];
}
return Array.from(this.subscribers.keys()).map(api => {
if (!this.isAllowedApi(api)) {
return [];
}
// TODO: optimize - check when all namespaces are selected and then request all in one
if (api.isNamespaced && !this.context.cluster.isGlobalWatchEnabled) {
return this.context.contextNamespaces.map(namespace => api.getWatchUrl(namespace));
}
return api.getWatchUrl();
}).flat();
}
constructor() {
this.init();
}
private async init() {
await this.contextReady;
this.bindAutoConnect();
}
private bindAutoConnect() {
const connect = debounce(() => this.connect(), 1000);
reaction(() => this.apis, connect, {
fireImmediately: true,
equals: comparer.structural,
});
window.addEventListener("online", () => this.connect());
window.addEventListener("offline", () => this.disconnect());
setInterval(() => this.connectionCheck(), 60000 * 5); // every 5m
}
getSubscribersCount(api: KubeApi) {
return this.subscribers.get(api) || 0;
}
isAllowedApi(api: KubeApi): boolean {
return Boolean(this.context?.cluster.isAllowedResource(api.kind));
}
subscribeApi(api: KubeApi | KubeApi[]): () => void {
const apis: KubeApi[] = [api].flat();
apis.forEach(api => {
if (!this.isAllowedApi(api)) return; // skip
this.subscribers.set(api, this.getSubscribersCount(api) + 1);
});
return () => {
apis.forEach(api => {
const count = this.getSubscribersCount(api) - 1;
if (count <= 0) this.subscribers.delete(api);
else this.subscribers.set(api, count);
});
};
}
preloadStores(stores: KubeObjectStore[], opts: { namespaces?: string[], loadOnce?: boolean } = {}) {
const limitRequests = plimit(1); // load stores one by one to allow quick skipping when fast clicking btw pages
const preloading: Promise<any>[] = [];
@ -146,7 +78,6 @@ export class KubeWatchApi {
subscribeStores(stores: KubeObjectStore[], opts: IKubeWatchSubscribeStoreOptions = {}): () => void {
const { preload = true, waitUntilLoaded = true, loadOnce = false, } = opts;
const apis = new Set(stores.map(store => store.getSubscribeApis()).flat());
const subscribingNamespaces = opts.namespaces ?? this.context?.allNamespaces ?? [];
const unsubscribeList: Function[] = [];
let isUnsubscribed = false;
@ -157,7 +88,10 @@ export class KubeWatchApi {
const subscribe = () => {
if (isUnsubscribed) return;
apis.forEach(api => unsubscribeList.push(this.subscribeApi(api)));
stores.forEach((store) => {
unsubscribeList.push(store.subscribe());
});
};
if (preloading) {
@ -191,180 +125,6 @@ export class KubeWatchApi {
};
}
protected async connectionCheck() {
if (!this.isConnected) {
this.log({ message: "Offline: reconnecting.." });
await this.connect();
}
this.log({
message: `Connection check: ${this.isConnected ? "online" : "offline"}`,
meta: { connected: this.isConnected },
});
}
protected async connect(apis = this.apis) {
this.disconnect(); // close active connections first
if (!navigator.onLine || !apis.length) {
this.isConnected = false;
return;
}
this.log({
message: "Connecting",
meta: { apis }
});
try {
const requestId = ++this.requestId;
const abortController = new AbortController();
const request = await fetch(`${apiPrefix}/watch`, {
method: "POST",
body: JSON.stringify({ apis } as IWatchRoutePayload),
signal: abortController.signal,
headers: {
"content-type": "application/json"
}
});
// request above is stale since new request-id has been issued
if (this.requestId !== requestId) {
abortController.abort();
return;
}
let jsonBuffer = "";
const stream = request.body.pipeThrough(new TextDecoderStream());
const reader = stream.getReader();
this.isConnected = true;
this.reader = reader;
while (true) {
const { done, value } = await reader.read();
if (done) break; // exit
const events = (jsonBuffer + value).trim().split("\n");
jsonBuffer = this.processBuffer(events);
}
} catch (error) {
this.log({ message: error });
} finally {
this.isConnected = false;
}
}
protected disconnect() {
this.reader?.cancel();
this.reader = null;
this.isConnected = false;
}
// process received stream events, returns unprocessed buffer chunk if any
protected processBuffer(events: string[]): string {
for (const json of events) {
try {
const kubeEvent: IKubeWatchEvent = JSON.parse(json);
const message = this.getMessage(kubeEvent);
const { data, namespace } = message;
// log all data events
if (data) {
this.log({
message: `[${data.type}] ${data.object.kind} in ${namespace || "(cluster)"}`,
meta: data,
cssStyle: `color: ${[
data.type === "ADDED" && "green",
data.type === "MODIFIED" && "darkgray",
data.type === "DELETED" && "red",
].filter(Boolean)};`,
});
}
// skip updates from non-watching resources context
if (!namespace || this.context?.contextNamespaces.includes(namespace)) {
this.onMessage.emit(message);
}
} catch (error) {
return json;
}
}
return "";
}
protected getMessage(event: IKubeWatchEvent): IKubeWatchMessage {
const message: IKubeWatchMessage = {};
switch (event.type) {
case "ADDED":
case "DELETED":
case "MODIFIED": {
const data = event as IKubeWatchEvent<KubeJsonApiData>;
const api = apiManager.getApiByKind(data.object.kind, data.object.apiVersion);
message.data = data;
if (api) {
ensureObjectSelfLink(api, data.object);
const { namespace, resourceVersion } = data.object.metadata;
api.setResourceVersion(namespace, resourceVersion);
api.setResourceVersion("", resourceVersion);
message.api = api;
message.store = apiManager.getStore(api);
message.namespace = namespace;
}
break;
}
case "ERROR":
message.error = event as IKubeWatchEvent<KubeJsonApiError>;
break;
case "STREAM_END": {
this.onServerStreamEnd(event as IKubeWatchEventStreamEnd, {
reconnectAttempts: 5,
timeout: 1000,
});
break;
}
}
return message;
}
protected async onServerStreamEnd(event: IKubeWatchEventStreamEnd, opts?: IKubeWatchReconnectOptions) {
const { apiBase, namespace } = parseKubeApi(event.url);
const api = apiManager.getApi(apiBase);
if (!api) return;
try {
await api.refreshResourceVersion({ namespace });
this.connect();
} catch (error) {
this.log({
message: new Error(`Failed to connect on single stream end: ${error}`),
meta: { event, error },
});
if (this.isActive && opts?.reconnectAttempts > 0) {
opts.reconnectAttempts--;
setTimeout(() => this.onServerStreamEnd(event, opts), opts.timeout); // repeat event
}
}
}
protected log({ message, cssStyle = "", meta = {} }: IKubeWatchLog) {
if (isProduction && !isDebugging) {
return;

View File

@ -6,7 +6,6 @@ import { OverviewWorkloadStatus } from "./overview-workload-status";
import { Link } from "react-router-dom";
import { workloadURL, workloadStores } from "../+workloads";
import { namespaceStore } from "../+namespaces/namespace.store";
import { PageFiltersList } from "../item-object-list/page-filters-list";
import { NamespaceSelectFilter } from "../+namespaces/namespace-select";
import { isAllowedResource, KubeResource } from "../../../common/rbac";
import { ResourceNames } from "../../../renderer/utils/rbac";
@ -50,7 +49,6 @@ export class OverviewStatuses extends React.Component {
<h5 className="box grow">Overview</h5>
<NamespaceSelectFilter />
</div>
<PageFiltersList />
<div className="workloads">
{workloads}
</div>

View File

@ -181,11 +181,7 @@ export class ItemListLayout extends React.Component<ItemListLayoutProps> {
@computed get filters() {
let { activeFilters } = pageFilters;
const { isClusterScoped, isSearchable, searchFilters } = this.props;
if (isClusterScoped) {
activeFilters = activeFilters.filter(({ type }) => type !== FilterType.NAMESPACE);
}
const { isSearchable, searchFilters } = this.props;
if (!(isSearchable && searchFilters)) {
activeFilters = activeFilters.filter(({ type }) => type !== FilterType.SEARCH);
@ -341,8 +337,8 @@ export class ItemListLayout extends React.Component<ItemListLayoutProps> {
}
renderInfo() {
const { allItems, items, isReady, userSettings, filters } = this;
const allItemsCount = allItems.length;
const { items, isReady, userSettings, filters } = this;
const allItemsCount = this.props.store.getTotalCount();
const itemsCount = items.length;
const isFiltered = isReady && filters.length > 0;

View File

@ -1,6 +1,5 @@
import { computed, observable, reaction } from "mobx";
import { autobind } from "../../utils";
import { namespaceStore } from "../+namespaces/namespace.store";
import { searchUrlParam } from "../input/search-input-url";
export enum FilterType {
@ -24,33 +23,6 @@ export class PageFiltersStore {
constructor() {
this.syncWithGlobalSearch();
this.syncWithContextNamespace();
}
// todo: refactor
protected syncWithContextNamespace() {
const disposers = [
reaction(() => this.getValues(FilterType.NAMESPACE), filteredNs => {
if (filteredNs.length !== namespaceStore.contextNamespaces.length) {
namespaceStore.setContext(filteredNs);
}
}),
namespaceStore.onContextChange(namespaces => {
const filteredNs = this.getValues(FilterType.NAMESPACE);
const isChanged = namespaces.length !== filteredNs.length;
if (isChanged) {
this.filters.replace([
...this.filters.filter(({ type }) => type !== FilterType.NAMESPACE),
...namespaces.map(ns => ({ type: FilterType.NAMESPACE, value: ns })),
]);
}
}, {
fireImmediately: true
})
];
return () => disposers.forEach(dispose => dispose());
}
protected syncWithGlobalSearch() {

View File

@ -26,6 +26,10 @@ export abstract class ItemStore<T extends ItemObject = ItemObject> {
return this.items.toJS();
}
public getTotalCount(): number {
return this.items.length;
}
getByName(name: string, ...args: any[]): T;
getByName(name: string): T {
return this.items.find(item => item.getName() === name);

View File

@ -3,7 +3,7 @@ import type { ClusterContext } from "./components/context";
import { action, computed, observable, reaction, when } from "mobx";
import { autobind } from "./utils";
import { KubeObject } from "./api/kube-object";
import { IKubeWatchEvent, IKubeWatchMessage, kubeWatchApi } from "./api/kube-watch-api";
import { IKubeWatchEvent } from "./api/kube-watch-api";
import { ItemStore } from "./item.store";
import { apiManager } from "./api/api-manager";
import { IKubeApiQueryParams, KubeApi, parseKubeApi } from "./api/kube-api";
@ -21,6 +21,7 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
abstract api: KubeApi<T>;
public readonly limit?: number;
public readonly bufferSize: number = 50000;
private loadedNamespaces: string[] = [];
contextReady = when(() => Boolean(this.context));
@ -43,6 +44,10 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
});
}
getTotalCount(): number {
return this.contextItems.length;
}
get query(): IKubeApiQueryParams {
const { limit } = this;
@ -107,8 +112,12 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
const isLoadingAll = this.context.allNamespaces.every(ns => namespaces.includes(ns));
if (isLoadingAll) {
this.loadedNamespaces = [];
return api.list({}, this.query);
} else {
this.loadedNamespaces = namespaces;
return Promise // load resources per namespace
.all(namespaces.map(namespace => api.list({ namespace })))
.then(items => items.flat());
@ -248,11 +257,6 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
protected eventsBuffer = observable.array<IKubeWatchEvent<KubeJsonApiData>>([], { deep: false });
protected bindWatchEventsUpdater(delay = 1000) {
kubeWatchApi.onMessage.addListener((evt: IKubeWatchMessage<T>) => {
if (!this.isLoaded || evt.store !== this) return;
this.eventsBuffer.push(evt.data);
});
reaction(() => this.eventsBuffer.length, this.updateFromEventsBuffer, {
delay
});
@ -263,7 +267,29 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
}
subscribe(apis = this.getSubscribeApis()) {
return kubeWatchApi.subscribeApi(apis);
let disposers: {(): void}[] = [];
const callback = (data: IKubeWatchEvent) => {
this.eventsBuffer.push(data);
};
if (this.context.cluster.isGlobalWatchEnabled) {
disposers = apis.map(api => api.watch({
namespace: "",
callback: (data) => callback(data)
}));
} else {
apis.map(api => {
this.loadedNamespaces.forEach((namespace) => {
disposers.push(api.watch({
namespace,
callback: (data) => callback(data)
}));
});
});
}
return () => disposers.forEach(dispose => dispose());
}
@action

View File

@ -1079,6 +1079,13 @@
resolved "https://registry.yarnpkg.com/@types/boom/-/boom-7.3.0.tgz#33280c5552d4cfabc21b8b7e0f6d29292decd985"
integrity sha512-PH7bfkt1nu4pnlxz+Ws+wwJJF1HE12W3ia+Iace2JT7q56DLH3hbyjOJyNHJYRxk3PkKaC36fHfHKyeG1rMgCA==
"@types/byline@^4.2.32":
version "4.2.32"
resolved "https://registry.yarnpkg.com/@types/byline/-/byline-4.2.32.tgz#9d35ec15968056118548412ee24c2c3026c997dc"
integrity sha512-qtlm/J6XOO9p+Ep/ZB5+mCFEDhzWDDHWU4a1eReN7lkPZXW9rkloq2jcAhvKKmlO5tL2GSvKROb+PTsNVhBiyQ==
dependencies:
"@types/node" "*"
"@types/caseless@*":
version "0.12.2"
resolved "https://registry.yarnpkg.com/@types/caseless/-/caseless-0.12.2.tgz#f65d3d6389e01eeb458bd54dc8f52b95a9463bc8"
@ -1608,6 +1615,14 @@
"@types/prop-types" "*"
csstype "^3.0.2"
"@types/readable-stream@^2.3.9":
version "2.3.9"
resolved "https://registry.yarnpkg.com/@types/readable-stream/-/readable-stream-2.3.9.tgz#40a8349e6ace3afd2dd1b6d8e9b02945de4566a9"
integrity sha512-sqsgQqFT7HmQz/V5jH1O0fvQQnXAJO46Gg9LRO/JPfjmVmGUlcx831TZZO3Y3HtWhIkzf3kTsNT0Z0kzIhIvZw==
dependencies:
"@types/node" "*"
safe-buffer "*"
"@types/relateurl@*":
version "0.2.28"
resolved "https://registry.yarnpkg.com/@types/relateurl/-/relateurl-0.2.28.tgz#6bda7db8653fa62643f5ee69e9f69c11a392e3a6"
@ -11449,6 +11464,14 @@ readable-stream@~1.1.10:
isarray "0.0.1"
string_decoder "~0.10.x"
readable-web-to-node-stream@^3.0.1:
version "3.0.1"
resolved "https://registry.yarnpkg.com/readable-web-to-node-stream/-/readable-web-to-node-stream-3.0.1.tgz#3f619b1bc5dd73a4cfe5c5f9b4f6faba55dff845"
integrity sha512-4zDC6CvjUyusN7V0QLsXVB7pJCD9+vtrM9bYDRv6uBQ+SKfx36rp5AFNPRgh9auKRul/a1iFZJYXcCbwRL+SaA==
dependencies:
"@types/readable-stream" "^2.3.9"
readable-stream "^3.6.0"
readdir-scoped-modules@^1.0.0, readdir-scoped-modules@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/readdir-scoped-modules/-/readdir-scoped-modules-1.1.0.tgz#8d45407b4f870a0dcaebc0e28670d18e74514309"
@ -11867,16 +11890,16 @@ rxjs@^6.5.2:
dependencies:
tslib "^1.9.0"
safe-buffer@*, safe-buffer@>=5.1.0, safe-buffer@^5.0.1, safe-buffer@^5.1.0, safe-buffer@^5.1.1, safe-buffer@^5.1.2, safe-buffer@^5.2.0, safe-buffer@~5.2.0:
version "5.2.1"
resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.2.1.tgz#1eaf9fa9bdb1fdd4ec75f58f9cdb4e6b7827eec6"
integrity sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==
safe-buffer@5.1.2, safe-buffer@~5.1.0, safe-buffer@~5.1.1:
version "5.1.2"
resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.2.tgz#991ec69d296e0313747d59bdfd2b745c35f8828d"
integrity sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==
safe-buffer@>=5.1.0, safe-buffer@^5.0.1, safe-buffer@^5.1.0, safe-buffer@^5.1.1, safe-buffer@^5.1.2, safe-buffer@^5.2.0, safe-buffer@~5.2.0:
version "5.2.1"
resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.2.1.tgz#1eaf9fa9bdb1fdd4ec75f58f9cdb4e6b7827eec6"
integrity sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==
safe-regex@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/safe-regex/-/safe-regex-1.1.0.tgz#40a3669f3b077d1e943d44629e157dd48023bf2e"