1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

store subscribing refactoring -- part 1

Signed-off-by: Roman <ixrock@gmail.com>
This commit is contained in:
Roman 2021-01-21 18:48:17 +02:00
parent 139ea14a31
commit 3e005fc611
15 changed files with 148 additions and 201 deletions

View File

@ -2,7 +2,7 @@ import type { KubeObjectStore } from "../kube-object.store";
import { action, observable } from "mobx";
import { autobind } from "../utils";
import { KubeApi } from "./kube-api";
import { KubeApi, parseKubeApi } from "./kube-api";
@autobind()
export class ApiManager {
@ -11,7 +11,7 @@ export class ApiManager {
getApi(pathOrCallback: string | ((api: KubeApi) => boolean)) {
if (typeof pathOrCallback === "string") {
return this.apis.get(pathOrCallback) || this.apis.get(KubeApi.parseApi(pathOrCallback).apiBase);
return this.apis.get(pathOrCallback) || this.apis.get(parseKubeApi(pathOrCallback).apiBase);
}
return Array.from(this.apis.values()).find(pathOrCallback ?? (() => true));

View File

@ -92,14 +92,6 @@ export function ensureObjectSelfLink(api: KubeApi, object: KubeJsonApiData) {
}
export class KubeApi<T extends KubeObject = any> {
static parseApi = parseKubeApi;
static watchAll(...apis: KubeApi[]) {
const disposers = apis.map(api => api.watch());
return () => disposers.forEach(unwatch => unwatch());
}
readonly kind: string;
readonly apiBase: string;
readonly apiPrefix: string;
@ -124,7 +116,7 @@ export class KubeApi<T extends KubeObject = any> {
if (!options.apiBase) {
options.apiBase = objectConstructor.apiBase;
}
const { apiBase, apiPrefix, apiGroup, apiVersion, resource } = KubeApi.parseApi(options.apiBase);
const { apiBase, apiPrefix, apiGroup, apiVersion, resource } = parseKubeApi(options.apiBase);
this.kind = kind;
this.isNamespaced = isNamespaced;
@ -157,7 +149,7 @@ export class KubeApi<T extends KubeObject = any> {
for (const apiUrl of apiBases) {
// Split e.g. "/apis/extensions/v1beta1/ingresses" to parts
const { apiPrefix, apiGroup, apiVersionWithGroup, resource } = KubeApi.parseApi(apiUrl);
const { apiPrefix, apiGroup, apiVersionWithGroup, resource } = parseKubeApi(apiUrl);
// Request available resources
try {
@ -366,7 +358,7 @@ export class KubeApi<T extends KubeObject = any> {
}
watch(): () => void {
return kubeWatchApi.subscribe(this);
return kubeWatchApi.subscribeApi(this);
}
}

View File

@ -2,13 +2,13 @@
import type { Cluster } from "../../main/cluster";
import type { IKubeWatchEvent, IKubeWatchEventStreamEnd, IWatchRoutePayload } from "../../main/routes/watch-route";
import type { KubeObject } from "./kube-object";
import type { KubeObjectStore } from "../kube-object.store";
import { computed, observable, reaction } from "mobx";
import { autobind, EventEmitter } from "../utils";
import { ensureObjectSelfLink, KubeApi } from "./kube-api";
import { ensureObjectSelfLink, KubeApi, parseKubeApi } from "./kube-api";
import { KubeJsonApiData, KubeJsonApiError } from "./kube-json-api";
import { KubeObjectStore } from "../kube-object.store";
import { apiPrefix, isProduction } from "../../common/vars";
import { apiManager } from "./api-manager";
@ -21,6 +21,11 @@ export interface IKubeWatchMessage<T extends KubeObject = any> {
store?: KubeObjectStore<T>;
}
export interface IKubeWatchSubscribeStoreOptions {
autoLoad?: boolean;
waitUntilLoaded?: boolean;
}
export interface IKubeWatchLog {
message: string | Error;
meta?: object | any;
@ -57,17 +62,49 @@ export class KubeWatchApi {
return this.subscribers.get(api) || 0;
}
subscribe(...apis: KubeApi[]) {
subscribeApi(api: KubeApi | KubeApi[]) {
const apis: KubeApi[] = [api].flat();
apis.forEach(api => {
this.subscribers.set(api, this.getSubscribersCount(api) + 1);
});
return () => apis.forEach(api => {
const count = this.getSubscribersCount(api) - 1;
return () => {
apis.forEach(api => {
const count = this.getSubscribersCount(api) - 1;
if (count <= 0) this.subscribers.delete(api);
else this.subscribers.set(api, count);
if (count <= 0) this.subscribers.delete(api);
else this.subscribers.set(api, count);
});
};
}
async subscribeStores(stores: KubeObjectStore[], options: IKubeWatchSubscribeStoreOptions = {}): Promise<() => void> {
this.log({
message: "Subscribing to stores",
meta: { stores, options },
});
const { autoLoad = true, waitUntilLoaded = true } = options;
const loading: Promise<any>[] = [];
if (autoLoad) {
loading.push(...stores.map(store => store.loadAll()));
}
if (waitUntilLoaded) {
try {
await Promise.all(loading);
} catch (error) {
this.log({
message: new Error("Loading stores has failed"),
meta: { stores, error, options },
})
}
}
const disposers = await Promise.all(stores.map(store => store.subscribe()));
return () => disposers.forEach(dispose => dispose()); // unsubscribe
}
protected async resolveCluster(): Promise<Cluster> {
@ -107,7 +144,7 @@ export class KubeWatchApi {
}
this.log({
message: "connecting",
message: "Connecting",
meta: payload,
});
@ -204,7 +241,7 @@ export class KubeWatchApi {
}
protected async onServerStreamEnd(event: IKubeWatchEventStreamEnd) {
const { apiBase, namespace } = KubeApi.parseApi(event.url);
const { apiBase, namespace } = parseKubeApi(event.url);
const api = apiManager.getApi(apiBase);
if (api) {
@ -213,7 +250,7 @@ export class KubeWatchApi {
this.connect();
} catch (error) {
this.log({
message: new Error("failed to reconnect on stream end"),
message: new Error("Failed to reconnect on stream end"),
meta: { error, event },
});
@ -227,7 +264,9 @@ export class KubeWatchApi {
}
protected log({ message, meta }: IKubeWatchLog) {
if (isProduction) return;
if (isProduction) {
return;
}
const logMessage = `%c[KUBE-WATCH-API]: ${String(message).toUpperCase()}`;
const isError = message instanceof Error;

View File

@ -3,13 +3,9 @@ import "./cluster-overview.scss";
import React from "react";
import { reaction } from "mobx";
import { disposeOnUnmount, observer } from "mobx-react";
import { eventStore } from "../+events/event.store";
import { nodesStore } from "../+nodes/nodes.store";
import { podsStore } from "../+workloads-pods/pods.store";
import { getHostedCluster } from "../../../common/cluster-store";
import { isAllowedResource } from "../../../common/rbac";
import { KubeObjectStore } from "../../kube-object.store";
import { interval } from "../../utils";
import { TabLayout } from "../layout/tab-layout";
import { Spinner } from "../spinner";
@ -17,45 +13,32 @@ import { ClusterIssues } from "./cluster-issues";
import { ClusterMetrics } from "./cluster-metrics";
import { clusterOverviewStore } from "./cluster-overview.store";
import { ClusterPieCharts } from "./cluster-pie-charts";
import { eventStore } from "../+events/event.store";
import { kubeWatchApi } from "../../api/kube-watch-api";
@observer
export class ClusterOverview extends React.Component {
private stores: KubeObjectStore<any>[] = [];
private subscribers: Array<() => void> = [];
private metricPoller = interval(60, this.loadMetrics);
@disposeOnUnmount
fetchMetrics = reaction(
() => clusterOverviewStore.metricNodeRole, // Toggle Master/Worker node switcher
() => this.metricPoller.restart(true)
);
private metricPoller = interval(60, () => this.loadMetrics());
loadMetrics() {
getHostedCluster().available && clusterOverviewStore.loadMetrics();
}
async componentDidMount() {
if (isAllowedResource("nodes")) {
this.stores.push(nodesStore);
}
this.metricPoller.start(true);
if (isAllowedResource("pods")) {
this.stores.push(podsStore);
}
if (isAllowedResource("events")) {
this.stores.push(eventStore);
}
await Promise.all(this.stores.map(store => store.loadAll()));
this.loadMetrics();
this.subscribers = this.stores.map(store => store.subscribe());
this.metricPoller.start();
disposeOnUnmount(this, [
await kubeWatchApi.subscribeStores([nodesStore, podsStore, eventStore], {
autoLoad: true,
}),
reaction(
() => clusterOverviewStore.metricNodeRole, // Toggle Master/Worker node switcher
() => this.metricPoller.restart(true)
),
]);
}
componentWillUnmount() {
this.subscribers.forEach(dispose => dispose()); // unsubscribe all
this.metricPoller.stop();
}

View File

@ -2,9 +2,9 @@ import "./namespace-select.scss";
import React from "react";
import { computed } from "mobx";
import { observer } from "mobx-react";
import { disposeOnUnmount, observer } from "mobx-react";
import { Select, SelectOption, SelectProps } from "../select";
import { cssNames, noop } from "../../utils";
import { cssNames } from "../../utils";
import { Icon } from "../icon";
import { namespaceStore } from "./namespace.store";
import { FilterIcon } from "../item-object-list/filter-icon";
@ -28,17 +28,14 @@ const defaultProps: Partial<Props> = {
@observer
export class NamespaceSelect extends React.Component<Props> {
static defaultProps = defaultProps as object;
private unsubscribe = noop;
async componentDidMount() {
if (!namespaceStore.isLoaded) {
await namespaceStore.loadAll();
}
this.unsubscribe = namespaceStore.subscribe();
}
componentWillUnmount() {
this.unsubscribe();
disposeOnUnmount(this, [
await namespaceStore.subscribe(),
]);
}
@computed get options(): SelectOption[] {
@ -60,7 +57,7 @@ export class NamespaceSelect extends React.Component<Props> {
return label || (
<>
{showIcons && <Icon small material="layers" />}
{showIcons && <Icon small material="layers"/>}
{value}
</>
);
@ -103,9 +100,9 @@ export class NamespaceSelectFilter extends React.Component {
return (
<div className="flex gaps align-center">
<FilterIcon type={FilterType.NAMESPACE} />
<FilterIcon type={FilterType.NAMESPACE}/>
<span>{namespace}</span>
{isSelected && <Icon small material="check" className="box right" />}
{isSelected && <Icon small material="check" className="box right"/>}
</div>
);
}}

View File

@ -117,15 +117,15 @@ export class NamespaceStore extends KubeObjectStore<Namespace> {
return namespaces;
}
subscribe(apis = [this.api]) {
getSubscribeApis() {
const { accessibleNamespaces } = getHostedCluster();
// if user has given static list of namespaces let's not start watches because watch adds stuff that's not wanted
if (accessibleNamespaces.length > 0) {
return Function; // no-op
return []; // no-op
}
return super.subscribe(apis);
return super.getSubscribeApis();
}
protected async loadItems(params: KubeObjectStoreLoadingParams) {

View File

@ -1,12 +1,12 @@
import "./service-details.scss";
import React from "react";
import { observer } from "mobx-react";
import { disposeOnUnmount, observer } from "mobx-react";
import { DrawerItem, DrawerTitle } from "../drawer";
import { Badge } from "../badge";
import { KubeEventDetails } from "../+events/kube-event-details";
import { KubeObjectDetailsProps } from "../kube-object";
import { Service, endpointApi } from "../../api/endpoints";
import { Service } from "../../api/endpoints";
import { KubeObjectMeta } from "../kube-object/kube-object-meta";
import { ServicePortComponent } from "./service-port-component";
import { endpointStore } from "../+network-endpoints/endpoints.store";
@ -18,11 +18,13 @@ interface Props extends KubeObjectDetailsProps<Service> {
@observer
export class ServiceDetails extends React.Component<Props> {
componentDidMount() {
async componentDidMount() {
if (!endpointStore.isLoaded) {
endpointStore.loadAll();
}
endpointApi.watch();
disposeOnUnmount(this, [
await endpointStore.subscribe(),
]);
}
render() {
@ -77,7 +79,7 @@ export class ServiceDetails extends React.Component<Props> {
)}
<DrawerTitle title={`Endpoint`}/>
<ServiceDetailsEndpoint endpoint={endpoint} />
<ServiceDetailsEndpoint endpoint={endpoint}/>
</div>
);
}

View File

@ -9,8 +9,8 @@ import { apiManager } from "../../api/api-manager";
export class RoleBindingsStore extends KubeObjectStore<RoleBinding> {
api = clusterRoleBindingApi;
subscribe() {
return super.subscribe([clusterRoleBindingApi, roleBindingApi]);
getSubscribeApis() {
return [clusterRoleBindingApi, roleBindingApi];
}
protected sortItems(items: RoleBinding[]) {

View File

@ -7,8 +7,8 @@ import { apiManager } from "../../api/api-manager";
export class RolesStore extends KubeObjectStore<Role> {
api = clusterRoleApi;
subscribe() {
return super.subscribe([roleApi, clusterRoleApi]);
getSubscribeApis() {
return [roleApi, clusterRoleApi];
}
protected sortItems(items: Role[]) {

View File

@ -1,8 +1,8 @@
import "./overview.scss";
import React from "react";
import { observable, when } from "mobx";
import { observer } from "mobx-react";
import { observable } from "mobx";
import { disposeOnUnmount, observer } from "mobx-react";
import { OverviewStatuses } from "./overview-statuses";
import { RouteComponentProps } from "react-router";
import { IWorkloadsOverviewRouteParams } from "../+workloads";
@ -15,9 +15,8 @@ import { replicaSetStore } from "../+workloads-replicasets/replicasets.store";
import { jobStore } from "../+workloads-jobs/job.store";
import { cronJobStore } from "../+workloads-cronjobs/cronjob.store";
import { Events } from "../+events";
import { KubeObjectStore } from "../../kube-object.store";
import { isAllowedResource } from "../../../common/rbac";
import { namespaceStore } from "../+namespaces/namespace.store";
import { kubeWatchApi } from "../../api/kube-watch-api";
interface Props extends RouteComponentProps<IWorkloadsOverviewRouteParams> {
}
@ -28,47 +27,18 @@ export class WorkloadsOverview extends React.Component<Props> {
@observable isUnmounting = false;
async componentDidMount() {
const stores: KubeObjectStore[] = [
isAllowedResource("pods") && podsStore,
isAllowedResource("deployments") && deploymentStore,
isAllowedResource("daemonsets") && daemonSetStore,
isAllowedResource("statefulsets") && statefulSetStore,
isAllowedResource("replicasets") && replicaSetStore,
isAllowedResource("jobs") && jobStore,
isAllowedResource("cronjobs") && cronJobStore,
isAllowedResource("events") && eventStore,
].filter(Boolean);
disposeOnUnmount(this, [
await kubeWatchApi.subscribeStores([
podsStore, deploymentStore, daemonSetStore,
statefulSetStore, replicaSetStore,
jobStore, cronJobStore, eventStore,
])
]);
const unsubscribeMap = new Map<KubeObjectStore, () => void>();
const loadStores = async () => {
this.isLoading = true;
for (const store of stores) {
if (this.isUnmounting) break;
try {
await store.loadAll();
unsubscribeMap.get(store)?.(); // unsubscribe previous watcher
unsubscribeMap.set(store, store.subscribe());
} catch (error) {
console.error("loading store error", error);
}
}
this.isLoading = false;
};
namespaceStore.onContextChange(loadStores, {
fireImmediately: true,
});
await when(() => this.isUnmounting && !this.isLoading);
unsubscribeMap.forEach(dispose => dispose());
unsubscribeMap.clear();
}
componentWillUnmount() {
this.isUnmounting = true;
// fixme: reload stores
// namespaceStore.onContextChange(loadStores, {
// fireImmediately: true,
// });
}
render() {

View File

@ -1,5 +1,5 @@
import React from "react";
import { observer } from "mobx-react";
import { disposeOnUnmount, observer } from "mobx-react";
import { Redirect, Route, Router, Switch } from "react-router";
import { history } from "../navigation";
import { Notifications } from "./notifications";
@ -45,6 +45,7 @@ import { eventStore } from "./+events/event.store";
import { computed, reaction } from "mobx";
import { nodesStore } from "./+nodes/nodes.store";
import { podsStore } from "./+workloads-pods/pods.store";
import { kubeWatchApi } from "../api/kube-watch-api";
import { sum } from "lodash";
import { ReplicaSetScaleDialog } from "./+workloads-replicasets/replicaset-scale-dialog";
@ -77,36 +78,22 @@ export class App extends React.Component {
async componentDidMount() {
const cluster = getHostedCluster();
const promises: Promise<void>[] = [];
if (isAllowedResource("events") && isAllowedResource("pods")) {
promises.push(eventStore.loadAll());
promises.push(podsStore.loadAll());
}
disposeOnUnmount(this, [
await kubeWatchApi.subscribeStores([podsStore, nodesStore, eventStore], {
autoLoad: true,
waitUntilLoaded: true,
}),
if (isAllowedResource("nodes")) {
promises.push(nodesStore.loadAll());
}
await Promise.all(promises);
if (eventStore.isLoaded && podsStore.isLoaded) {
eventStore.subscribe();
podsStore.subscribe();
}
if (nodesStore.isLoaded) {
nodesStore.subscribe();
}
reaction(() => this.warningsCount, (count) => {
broadcastMessage(`cluster-warning-event-count:${cluster.id}`, count);
});
reaction(() => this.warningsCount, (count) => {
broadcastMessage(`cluster-warning-event-count:${cluster.id}`, count);
}),
]);
}
@computed
get warningsCount() {
let warnings = sum(nodesStore.items
.map(node => node.getWarningConditions().length));
// todo: move to nodes-store.ts
@computed get warningsCount() {
let warnings = sum(nodesStore.items.map(node => node.getWarningConditions().length));
warnings = warnings + eventStore.getWarnings().length;

View File

@ -2,7 +2,7 @@ import "./item-list-layout.scss";
import groupBy from "lodash/groupBy";
import React, { ReactNode } from "react";
import { computed, IReactionDisposer, observable, reaction, toJS } from "mobx";
import { computed, observable, reaction, toJS } from "mobx";
import { disposeOnUnmount, observer } from "mobx-react";
import { ConfirmDialog, ConfirmDialogParams } from "../confirm-dialog";
import { Table, TableCell, TableCellProps, TableHead, TableProps, TableRow, TableRowProps, TableSortCallback } from "../table";
@ -12,7 +12,6 @@ import { NoItems } from "../no-items";
import { Spinner } from "../spinner";
import { ItemObject, ItemStore } from "../../item.store";
import { SearchInputUrl } from "../input";
import { namespaceStore } from "../+namespaces/namespace.store";
import { Filter, FilterType, pageFilters } from "./page-filters.store";
import { PageFiltersList } from "./page-filters-list";
import { PageFiltersSelect } from "./page-filters-select";
@ -97,10 +96,6 @@ interface ItemListLayoutUserSettings {
export class ItemListLayout extends React.Component<ItemListLayoutProps> {
static defaultProps = defaultProps as object;
private watchDisposers: IReactionDisposer[] = [];
@observable isUnmounting = false;
@observable userSettings: ItemListLayoutUserSettings = {
showAppliedFilters: false,
};
@ -125,50 +120,14 @@ export class ItemListLayout extends React.Component<ItemListLayoutProps> {
throw new Error("[ItemListLayout]: configurable list require props.tableId to be specified");
}
this.loadStores();
// fixme: reload stores
if (!isClusterScoped) {
disposeOnUnmount(this, [
namespaceStore.onContextChange(() => this.loadStores())
]);
// disposeOnUnmount(this, [
// namespaceStore.onContextChange(() => this.loadStores())
// ]);
}
}
async componentWillUnmount() {
this.isUnmounting = true;
this.unsubscribeStores();
}
@computed get stores() {
const { store, dependentStores } = this.props;
return new Set([store, ...dependentStores]);
}
async loadStores() {
this.unsubscribeStores(); // reset first
// load
for (const store of this.stores) {
if (this.isUnmounting) {
this.unsubscribeStores();
break;
}
try {
await store.loadAll();
this.watchDisposers.push(store.subscribe());
} catch (error) {
console.error("loading store error", error);
}
}
}
unsubscribeStores() {
this.watchDisposers.forEach(dispose => dispose());
this.watchDisposers.length = 0;
}
private filterCallbacks: { [type: string]: ItemsFilter } = {
[FilterType.SEARCH]: items => {
const { searchFilters, isSearchable } = this.props;

View File

@ -1,15 +1,17 @@
import React from "react";
import { computed } from "mobx";
import { observer } from "mobx-react";
import { disposeOnUnmount, observer } from "mobx-react";
import { cssNames } from "../../utils";
import { KubeObject } from "../../api/kube-object";
import { ItemListLayout, ItemListLayoutProps } from "../item-object-list/item-list-layout";
import { KubeObjectStore } from "../../kube-object.store";
import { KubeObjectMenu } from "./kube-object-menu";
import { kubeSelectedUrlParam, showDetails } from "./kube-object-details";
import { kubeWatchApi } from "../../api/kube-watch-api";
export interface KubeObjectListLayoutProps extends ItemListLayoutProps {
store: KubeObjectStore;
dependentStores?: KubeObjectStore[];
}
@observer
@ -18,6 +20,15 @@ export class KubeObjectListLayout extends React.Component<KubeObjectListLayoutPr
return this.props.store.getByPath(kubeSelectedUrlParam.get());
}
async componentDidMount() {
const { store, dependentStores } = this.props;
const stores = Array.from(new Set([store, ...dependentStores]));
disposeOnUnmount(this, [
await kubeWatchApi.subscribeStores(stores)
]);
}
onDetails = (item: KubeObject) => {
if (this.props.onDetails) {
this.props.onDetails(item);

View File

@ -167,7 +167,7 @@ export abstract class ItemStore<T extends ItemObject = ItemObject> {
async removeSelectedItems?(): Promise<any>;
// eslint-disable-next-line unused-imports/no-unused-vars-ts
subscribe(...args: any[]) {
async subscribe(...args: any[]): Promise<() => void> {
return noop;
}

View File

@ -5,7 +5,7 @@ import { KubeObject } from "./api/kube-object";
import { IKubeWatchEvent, IKubeWatchMessage, kubeWatchApi } from "./api/kube-watch-api";
import { ItemStore } from "./item.store";
import { apiManager } from "./api/api-manager";
import { IKubeApiQueryParams, KubeApi } from "./api/kube-api";
import { IKubeApiQueryParams, KubeApi, parseKubeApi } from "./api/kube-api";
import { KubeJsonApiData } from "./api/kube-json-api";
export interface KubeObjectStoreLoadingParams {
@ -152,7 +152,7 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
@action
async loadFromPath(resourcePath: string) {
const { namespace, name } = KubeApi.parseApi(resourcePath);
const { namespace, name } = parseKubeApi(resourcePath);
return this.load({ name, namespace });
}
@ -203,8 +203,15 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
});
}
subscribe(apis = [this.api]) {
return KubeApi.watchAll(...apis);
getSubscribeApis(): KubeApi[] {
return [this.api];
}
async subscribe(apis = this.getSubscribeApis()) {
const cluster = await this.resolveCluster();
const allowedApis = apis.filter(api => cluster.isAllowedResource(api.kind));
return kubeWatchApi.subscribeApi(allowedApis);
}
@action