1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

subscribe for watching resources in single request when has admin-like access rights

Signed-off-by: Roman <ixrock@gmail.com>
This commit is contained in:
Roman 2021-02-02 18:39:38 +02:00
parent bd9e44109a
commit f91ce92666
2 changed files with 85 additions and 28 deletions

View File

@ -48,6 +48,7 @@ export interface ClusterState {
isAdmin: boolean;
allowedNamespaces: string[]
allowedResources: string[]
watchApiAllEnabled: boolean;
}
/**
@ -91,7 +92,6 @@ export class Cluster implements ClusterModel, ClusterState {
*/
@observable initializing = false;
/**
* Is cluster object initialized
*
@ -177,6 +177,12 @@ export class Cluster implements ClusterModel, ClusterState {
* @observable
*/
@observable isAdmin = false;
/**
* Does watch-api could be used for all resources, e.g. "/api/v1/services?watch=1"
*
* @observable
*/
@observable watchApiAllEnabled = false;
/**
* Preferences
*
@ -353,9 +359,7 @@ export class Cluster implements ClusterModel, ClusterState {
await this.refreshConnectionStatus();
if (this.accessible) {
await this.refreshAllowedResources();
this.isAdmin = await this.isClusterAdmin();
this.ready = true;
await this.refreshAccessibility();
this.ensureKubectl();
}
this.activated = true;
@ -410,13 +414,11 @@ export class Cluster implements ClusterModel, ClusterState {
await this.refreshConnectionStatus();
if (this.accessible) {
this.isAdmin = await this.isClusterAdmin();
await this.refreshAllowedResources();
await this.refreshAccessibility();
if (opts.refreshMetadata) {
this.refreshMetadata();
}
this.ready = true;
}
this.pushState();
}
@ -433,6 +435,18 @@ export class Cluster implements ClusterModel, ClusterState {
this.metadata = Object.assign(existingMetadata, metadata);
}
/**
* @internal
*/
async refreshAccessibility(): Promise<void> {
this.isAdmin = await this.isClusterAdmin();
this.watchApiAllEnabled = await this.canUseWatchApi({ resource: "*" });
await this.refreshAllowedResources();
this.ready = true;
}
/**
* @internal
*/
@ -571,6 +585,17 @@ export class Cluster implements ClusterModel, ClusterState {
});
}
/**
* @internal
*/
async canUseWatchApi(customizeResource: V1ResourceAttributes = {}): Promise<boolean> {
return this.canI({
verb: "watch",
resource: "*",
...customizeResource,
});
}
toJSON(): ClusterModel {
const model: ClusterModel = {
id: this.id,
@ -604,6 +629,7 @@ export class Cluster implements ClusterModel, ClusterState {
isAdmin: this.isAdmin,
allowedNamespaces: this.allowedNamespaces,
allowedResources: this.allowedResources,
watchApiAllEnabled: this.watchApiAllEnabled,
};
return toJS(state, {

View File

@ -8,8 +8,8 @@ import type { KubeObjectStore } from "../kube-object.store";
import plimit from "p-limit";
import debounce from "lodash/debounce";
import { autorun, comparer, computed, observable, reaction } from "mobx";
import { autobind, EventEmitter } from "../utils";
import { autorun, comparer, computed, IReactionDisposer, observable, reaction } from "mobx";
import { autobind, EventEmitter, noop } from "../utils";
import { ensureObjectSelfLink, KubeApi, parseKubeApi } from "./kube-api";
import { KubeJsonApiData, KubeJsonApiError } from "./kube-json-api";
import { apiPrefix, isDebugging, isProduction } from "../../common/vars";
@ -18,6 +18,7 @@ import { apiManager } from "./api-manager";
export { IKubeWatchEvent, IKubeWatchEventStreamEnd };
export interface IKubeWatchMessage<T extends KubeObject = any> {
namespace?: string;
data?: IKubeWatchEvent<KubeJsonApiData>
error?: IKubeWatchEvent<KubeJsonApiError>;
api?: KubeApi<T>;
@ -27,7 +28,7 @@ export interface IKubeWatchMessage<T extends KubeObject = any> {
export interface IKubeWatchSubscribeStoreOptions {
preload?: boolean; // preload store items, default: true
waitUntilLoaded?: boolean; // subscribe only after loading all stores, default: true
cacheLoading?: boolean; // when enabled loading store will be skipped, default: false
loadOnce?: boolean; // check store.isLoaded to skip loading if done already, default: false
}
export interface IKubeWatchReconnectOptions {
@ -51,6 +52,12 @@ export class KubeWatchApi {
@observable subscribers = observable.map<KubeApi, number>();
@observable isConnected = false;
@computed get watchAllEnabled(): boolean {
const { isAdmin, watchApiAllEnabled } = this?.cluster;
return Boolean(isAdmin || watchApiAllEnabled);
}
@computed get isReady(): boolean {
return Boolean(this.cluster && this.namespaces);
}
@ -69,11 +76,11 @@ export class KubeWatchApi {
return [];
}
if (api.isNamespaced) {
if (api.isNamespaced && !this.watchAllEnabled) {
return this.namespaces.map(namespace => api.getWatchUrl(namespace));
} else {
return api.getWatchUrl();
}
return api.getWatchUrl();
}).flat();
}
@ -127,36 +134,54 @@ export class KubeWatchApi {
};
}
subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): () => void {
const { preload = true, waitUntilLoaded = true, cacheLoading = false } = options;
preloadStores(stores: KubeObjectStore[], { loadOnce = false } = {}) {
const limitRequests = plimit(1); // load stores one by one to allow quick skipping when fast clicking btw pages
const preloading: Promise<any>[] = [];
for (const store of stores) {
preloading.push(limitRequests(async () => {
if (store.isLoaded && loadOnce) return; // skip
return store.loadAll(this.namespaces);
}));
}
return {
loading: Promise.all(preloading),
cancelLoading: () => limitRequests.clearQueue(),
};
}
subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): () => void {
const { preload = true, waitUntilLoaded = true, loadOnce = false } = options;
const apis = new Set(stores.map(store => store.getSubscribeApis()).flat());
const unsubscribeList: (() => void)[] = [];
let isUnsubscribed = false;
const load = () => this.preloadStores(stores, { loadOnce });
let preloading = preload && load();
let cancelReloading: IReactionDisposer = noop;
const subscribe = () => {
if (isUnsubscribed) return;
apis.forEach(api => unsubscribeList.push(this.subscribeApi(api)));
};
if (preload) {
for (const store of stores) {
preloading.push(limitRequests(async () => {
if (cacheLoading && store.isLoaded) return; // skip
return store.loadAll(this.namespaces);
}));
}
}
if (waitUntilLoaded) {
Promise.all(preloading).then(subscribe, error => {
if (preloading && waitUntilLoaded) {
preloading.loading.then(subscribe, error => {
this.log({
message: new Error("Loading stores has failed"),
meta: { stores, error, options },
});
});
// reload when context namespaces changes
cancelReloading = reaction(() => this.namespaces, () => {
preloading?.cancelLoading();
preloading = load();
}, {
equals: comparer.shallow,
});
} else {
subscribe();
}
@ -165,7 +190,8 @@ export class KubeWatchApi {
return () => {
if (isUnsubscribed) return;
isUnsubscribed = true;
limitRequests.clearQueue();
cancelReloading();
preloading?.cancelLoading();
unsubscribeList.forEach(unsubscribe => unsubscribe());
};
}
@ -252,6 +278,10 @@ export class KubeWatchApi {
const kubeEvent: IKubeWatchEvent = JSON.parse(json);
const message = this.getMessage(kubeEvent);
if (!this.namespaces.includes(message.namespace)) {
continue; // skip updates from non-watching resources context
}
this.onMessage.emit(message);
} catch (error) {
return json;
@ -284,6 +314,7 @@ export class KubeWatchApi {
message.api = api;
message.store = apiManager.getStore(api);
message.namespace = namespace;
}
break;
}