mirror of
https://github.com/lensapp/lens.git
synced 2025-05-20 05:10:56 +00:00
Fix KubeObjectStore not correctly tracking loading of namespaces (#2266)
This commit is contained in:
parent
2c3750c240
commit
082885cabf
@ -11,10 +11,12 @@ export * from "./debouncePromise";
|
||||
export * from "./defineGlobal";
|
||||
export * from "./delay";
|
||||
export * from "./disposer";
|
||||
export * from "./disposer";
|
||||
export * from "./downloadFile";
|
||||
export * from "./escapeRegExp";
|
||||
export * from "./getRandId";
|
||||
export * from "./openExternal";
|
||||
export * from "./reject-promise";
|
||||
export * from "./saveToAppFiles";
|
||||
export * from "./singleton";
|
||||
export * from "./splitArray";
|
||||
|
||||
13
src/common/utils/reject-promise.ts
Normal file
13
src/common/utils/reject-promise.ts
Normal file
@ -0,0 +1,13 @@
|
||||
import "abort-controller/polyfill";
|
||||
|
||||
/**
|
||||
* Creates a new promise that will be rejected when the signal rejects.
|
||||
*
|
||||
* Useful for `Promise.race()` applications.
|
||||
* @param signal The AbortController's signal to reject with
|
||||
*/
|
||||
export function rejectPromiseBy(signal: AbortSignal): Promise<void> {
|
||||
return new Promise((_, reject) => {
|
||||
signal.addEventListener("abort", reject);
|
||||
});
|
||||
}
|
||||
@ -50,6 +50,11 @@ export interface IKubeApiQueryParams {
|
||||
fieldSelector?: string | string[]; // restrict list of objects by their fields, e.g. fieldSelector: "field=name"
|
||||
}
|
||||
|
||||
export interface KubeApiListOptions {
|
||||
namespace?: string;
|
||||
reqInit?: RequestInit;
|
||||
}
|
||||
|
||||
export interface IKubePreferredVersion {
|
||||
preferredVersion?: {
|
||||
version: string;
|
||||
|
||||
@ -1,17 +1,19 @@
|
||||
import type { ClusterContext } from "./components/context";
|
||||
|
||||
import { action, computed, observable, reaction, when } from "mobx";
|
||||
import { autobind } from "./utils";
|
||||
import { autobind, noop, rejectPromiseBy } from "./utils";
|
||||
import { KubeObject, KubeStatus } from "./api/kube-object";
|
||||
import { IKubeWatchEvent } from "./api/kube-watch-api";
|
||||
import { ItemStore } from "./item.store";
|
||||
import { apiManager } from "./api/api-manager";
|
||||
import { IKubeApiQueryParams, KubeApi, parseKubeApi } from "./api/kube-api";
|
||||
import { KubeJsonApiData } from "./api/kube-json-api";
|
||||
import { Notifications } from "./components/notifications";
|
||||
|
||||
export interface KubeObjectStoreLoadingParams {
|
||||
namespaces: string[];
|
||||
api?: KubeApi;
|
||||
reqInit?: RequestInit;
|
||||
}
|
||||
|
||||
@autobind()
|
||||
@ -21,9 +23,10 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
|
||||
abstract api: KubeApi<T>;
|
||||
public readonly limit?: number;
|
||||
public readonly bufferSize: number = 50000;
|
||||
private loadedNamespaces: string[] = [];
|
||||
@observable private loadedNamespaces?: string[];
|
||||
|
||||
contextReady = when(() => Boolean(this.context));
|
||||
namespacesReady = when(() => Boolean(this.loadedNamespaces));
|
||||
|
||||
constructor() {
|
||||
super();
|
||||
@ -103,10 +106,10 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
|
||||
}
|
||||
}
|
||||
|
||||
protected async loadItems({ namespaces, api }: KubeObjectStoreLoadingParams): Promise<T[]> {
|
||||
protected async loadItems({ namespaces, api, reqInit }: KubeObjectStoreLoadingParams): Promise<T[]> {
|
||||
if (this.context?.cluster.isAllowedResource(api.kind)) {
|
||||
if (!api.isNamespaced) {
|
||||
return api.list({}, this.query);
|
||||
return api.list({ reqInit }, this.query);
|
||||
}
|
||||
|
||||
const isLoadingAll = this.context.allNamespaces?.length > 1
|
||||
@ -116,13 +119,13 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
|
||||
if (isLoadingAll) {
|
||||
this.loadedNamespaces = [];
|
||||
|
||||
return api.list({}, this.query);
|
||||
return api.list({ reqInit }, this.query);
|
||||
} else {
|
||||
this.loadedNamespaces = namespaces;
|
||||
|
||||
return Promise // load resources per namespace
|
||||
.all(namespaces.map(namespace => api.list({ namespace })))
|
||||
.then(items => items.flat());
|
||||
.all(namespaces.map(namespace => api.list({ namespace, reqInit })))
|
||||
.then(items => items.flat().filter(Boolean));
|
||||
}
|
||||
}
|
||||
|
||||
@ -134,7 +137,7 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
|
||||
}
|
||||
|
||||
@action
|
||||
async loadAll(options: { namespaces?: string[], merge?: boolean } = {}): Promise<void | T[]> {
|
||||
async loadAll(options: { namespaces?: string[], merge?: boolean, reqInit?: RequestInit } = {}): Promise<void | T[]> {
|
||||
await this.contextReady;
|
||||
this.isLoading = true;
|
||||
|
||||
@ -142,9 +145,10 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
|
||||
const {
|
||||
namespaces = this.context.allNamespaces, // load all namespaces by default
|
||||
merge = true, // merge loaded items or return as result
|
||||
reqInit,
|
||||
} = options;
|
||||
|
||||
const items = await this.loadItems({ namespaces, api: this.api });
|
||||
const items = await this.loadItems({ namespaces, api: this.api, reqInit });
|
||||
|
||||
if (merge) {
|
||||
this.mergeItems(items, { replace: false });
|
||||
@ -157,7 +161,10 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
|
||||
|
||||
return items;
|
||||
} catch (error) {
|
||||
console.error("Loading store items failed", { error, store: this });
|
||||
if (error.message) {
|
||||
Notifications.error(error.message);
|
||||
}
|
||||
console.error("Loading store items failed", { error });
|
||||
this.resetOnError(error);
|
||||
this.failedLoading = true;
|
||||
} finally {
|
||||
@ -274,17 +281,21 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
|
||||
|
||||
subscribe(apis = this.getSubscribeApis()) {
|
||||
const abortController = new AbortController();
|
||||
const namespaces = [...this.loadedNamespaces];
|
||||
|
||||
if (this.context.cluster?.isGlobalWatchEnabled && namespaces.length === 0) {
|
||||
apis.forEach(api => this.watchNamespace(api, "", abortController));
|
||||
} else {
|
||||
apis.forEach(api => {
|
||||
this.loadedNamespaces.forEach((namespace) => {
|
||||
this.watchNamespace(api, namespace, abortController);
|
||||
});
|
||||
});
|
||||
}
|
||||
// This waits for the context and namespaces to be ready or fails fast if the disposer is called
|
||||
Promise.race([rejectPromiseBy(abortController.signal), Promise.all([this.contextReady, this.namespacesReady])])
|
||||
.then(() => {
|
||||
if (this.context.cluster.isGlobalWatchEnabled && this.loadedNamespaces.length === 0) {
|
||||
apis.forEach(api => this.watchNamespace(api, "", abortController));
|
||||
} else {
|
||||
apis.forEach(api => {
|
||||
this.loadedNamespaces.forEach((namespace) => {
|
||||
this.watchNamespace(api, namespace, abortController);
|
||||
});
|
||||
});
|
||||
}
|
||||
})
|
||||
.catch(noop); // ignore DOMExceptions
|
||||
|
||||
return () => {
|
||||
abortController.abort();
|
||||
@ -293,48 +304,39 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
|
||||
|
||||
private watchNamespace(api: KubeApi<T>, namespace: string, abortController: AbortController) {
|
||||
let timedRetry: NodeJS.Timeout;
|
||||
const watch = () => api.watch({
|
||||
namespace,
|
||||
abortController,
|
||||
callback
|
||||
});
|
||||
|
||||
abortController.signal.addEventListener("abort", () => clearTimeout(timedRetry));
|
||||
const { signal } = abortController;
|
||||
|
||||
const callback = (data: IKubeWatchEvent, error: any) => {
|
||||
if (!this.isLoaded || abortController.signal.aborted) return;
|
||||
if (!this.isLoaded || error instanceof DOMException) return;
|
||||
|
||||
if (error instanceof Response) {
|
||||
if (error.status === 404) {
|
||||
// api has gone, let's not retry
|
||||
return;
|
||||
} else { // not sure what to do, best to retry
|
||||
if (timedRetry) clearTimeout(timedRetry);
|
||||
timedRetry = setTimeout(() => {
|
||||
api.watch({
|
||||
namespace,
|
||||
abortController,
|
||||
callback
|
||||
});
|
||||
}, 5000);
|
||||
}
|
||||
|
||||
// not sure what to do, best to retry
|
||||
clearTimeout(timedRetry);
|
||||
timedRetry = setTimeout(watch, 5000);
|
||||
} else if (error instanceof KubeStatus && error.code === 410) {
|
||||
if (timedRetry) clearTimeout(timedRetry);
|
||||
clearTimeout(timedRetry);
|
||||
// resourceVersion has gone, let's try to reload
|
||||
timedRetry = setTimeout(() => {
|
||||
(namespace === "" ? this.loadAll({ merge: false }) : this.loadAll({namespaces: [namespace]})).then(() => {
|
||||
api.watch({
|
||||
namespace,
|
||||
abortController,
|
||||
callback
|
||||
});
|
||||
});
|
||||
(
|
||||
namespace
|
||||
? this.loadAll({ namespaces: [namespace], reqInit: { signal } })
|
||||
: this.loadAll({ merge: false, reqInit: { signal } })
|
||||
).then(watch);
|
||||
}, 1000);
|
||||
} else if(error) { // not sure what to do, best to retry
|
||||
if (timedRetry) clearTimeout(timedRetry);
|
||||
|
||||
timedRetry = setTimeout(() => {
|
||||
api.watch({
|
||||
namespace,
|
||||
abortController,
|
||||
callback
|
||||
});
|
||||
}, 5000);
|
||||
} else if (error) { // not sure what to do, best to retry
|
||||
clearTimeout(timedRetry);
|
||||
timedRetry = setTimeout(watch, 5000);
|
||||
}
|
||||
|
||||
if (data) {
|
||||
@ -342,11 +344,8 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
|
||||
}
|
||||
};
|
||||
|
||||
api.watch({
|
||||
namespace,
|
||||
abortController,
|
||||
callback: (data, error) => callback(data, error)
|
||||
});
|
||||
signal.addEventListener("abort", () => clearTimeout(timedRetry));
|
||||
watch();
|
||||
}
|
||||
|
||||
@action
|
||||
|
||||
Loading…
Reference in New Issue
Block a user