mirror of
https://github.com/lensapp/lens.git
synced 2025-05-20 05:10:56 +00:00
Improve kube watch error handling (#2112)
* improve kube watch error handling Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com> * cleanup Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com> * review fixes Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com> * no more object.assign Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com> * keep only items from non-loaded namespaces Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com> * cleanup Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com> * fix sleep/resume Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com> * fallback retry only on error Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>
This commit is contained in:
parent
b873ba76bd
commit
d7d4be7375
@ -8,7 +8,7 @@ import { apiManager } from "./api-manager";
|
|||||||
import { apiKube } from "./index";
|
import { apiKube } from "./index";
|
||||||
import { createKubeApiURL, parseKubeApi } from "./kube-api-parse";
|
import { createKubeApiURL, parseKubeApi } from "./kube-api-parse";
|
||||||
import { KubeJsonApi, KubeJsonApiData, KubeJsonApiDataList } from "./kube-json-api";
|
import { KubeJsonApi, KubeJsonApiData, KubeJsonApiDataList } from "./kube-json-api";
|
||||||
import { IKubeObjectConstructor, KubeObject } from "./kube-object";
|
import { IKubeObjectConstructor, KubeObject, KubeStatus } from "./kube-object";
|
||||||
import byline from "byline";
|
import byline from "byline";
|
||||||
import { ReadableWebToNodeStream } from "readable-web-to-node-stream";
|
import { ReadableWebToNodeStream } from "readable-web-to-node-stream";
|
||||||
import { IKubeWatchEvent } from "./kube-watch-api";
|
import { IKubeWatchEvent } from "./kube-watch-api";
|
||||||
@ -93,9 +93,11 @@ export function ensureObjectSelfLink(api: KubeApi, object: KubeJsonApiData) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type KubeApiWatchOptions = {
|
export type KubeApiWatchCallback = (data: IKubeWatchEvent, error: any) => void;
|
||||||
|
|
||||||
|
export type KubeApiWatchOptions = {
|
||||||
namespace: string;
|
namespace: string;
|
||||||
callback?: (data: IKubeWatchEvent) => void;
|
callback?: KubeApiWatchCallback;
|
||||||
abortController?: AbortController
|
abortController?: AbortController
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -370,8 +372,8 @@ export class KubeApi<T extends KubeObject = any> {
|
|||||||
if (!opts.abortController) {
|
if (!opts.abortController) {
|
||||||
opts.abortController = new AbortController();
|
opts.abortController = new AbortController();
|
||||||
}
|
}
|
||||||
|
let errorReceived = false;
|
||||||
const { abortController, namespace, callback } = opts;
|
const { abortController, namespace, callback } = opts;
|
||||||
|
|
||||||
const watchUrl = this.getWatchUrl(namespace);
|
const watchUrl = this.getWatchUrl(namespace);
|
||||||
const responsePromise = this.request.getResponse(watchUrl, null, {
|
const responsePromise = this.request.getResponse(watchUrl, null, {
|
||||||
signal: abortController.signal
|
signal: abortController.signal
|
||||||
@ -379,49 +381,50 @@ export class KubeApi<T extends KubeObject = any> {
|
|||||||
|
|
||||||
responsePromise.then((response) => {
|
responsePromise.then((response) => {
|
||||||
if (!response.ok && !abortController.signal.aborted) {
|
if (!response.ok && !abortController.signal.aborted) {
|
||||||
if (response.status === 410) { // resourceVersion has gone
|
callback?.(null, response);
|
||||||
setTimeout(() => {
|
|
||||||
this.refreshResourceVersion().then(() => {
|
|
||||||
this.watch({...opts, abortController});
|
|
||||||
});
|
|
||||||
}, 1000);
|
|
||||||
|
|
||||||
} else if (response.status >= 500) { // k8s is having hard time
|
|
||||||
setTimeout(() => {
|
|
||||||
this.watch({...opts, abortController});
|
|
||||||
}, 5000);
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const nodeStream = new ReadableWebToNodeStream(response.body);
|
const nodeStream = new ReadableWebToNodeStream(response.body);
|
||||||
|
|
||||||
|
nodeStream.on("end", () => {
|
||||||
|
if (errorReceived) return; // kubernetes errors should be handled in a callback
|
||||||
|
|
||||||
|
setTimeout(() => { // we did not get any kubernetes errors so let's retry
|
||||||
|
if (abortController.signal.aborted) return;
|
||||||
|
|
||||||
|
this.watch({...opts, namespace, callback});
|
||||||
|
}, 1000);
|
||||||
|
});
|
||||||
|
|
||||||
const stream = byline(nodeStream);
|
const stream = byline(nodeStream);
|
||||||
|
|
||||||
stream.on("data", (line) => {
|
stream.on("data", (line) => {
|
||||||
try {
|
try {
|
||||||
const event: IKubeWatchEvent = JSON.parse(line);
|
const event: IKubeWatchEvent = JSON.parse(line);
|
||||||
|
|
||||||
|
if (event.type === "ERROR" && event.object.kind === "Status") {
|
||||||
|
errorReceived = true;
|
||||||
|
callback(null, new KubeStatus(event.object as any));
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
this.modifyWatchEvent(event);
|
this.modifyWatchEvent(event);
|
||||||
|
|
||||||
if (callback) {
|
if (callback) {
|
||||||
callback(event);
|
callback(event, null);
|
||||||
}
|
}
|
||||||
} catch (ignore) {
|
} catch (ignore) {
|
||||||
// ignore parse errors
|
// ignore parse errors
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
stream.on("close", () => {
|
|
||||||
setTimeout(() => {
|
|
||||||
if (!abortController.signal.aborted) this.watch({...opts, namespace, callback});
|
|
||||||
}, 1000);
|
|
||||||
});
|
|
||||||
}, (error) => {
|
}, (error) => {
|
||||||
if (error instanceof DOMException) return; // AbortController rejects, we can ignore it
|
if (error instanceof DOMException) return; // AbortController rejects, we can ignore it
|
||||||
|
|
||||||
console.error("watch rejected", error);
|
callback?.(null, error);
|
||||||
}).catch((error) => {
|
}).catch((error) => {
|
||||||
console.error("watch error", error);
|
callback?.(null, error);
|
||||||
});
|
});
|
||||||
|
|
||||||
const disposer = () => {
|
const disposer = () => {
|
||||||
|
|||||||
@ -40,6 +40,29 @@ export interface IKubeObjectMetadata {
|
|||||||
}[];
|
}[];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface IKubeStatus {
|
||||||
|
kind: string;
|
||||||
|
apiVersion: string;
|
||||||
|
code: number;
|
||||||
|
message?: string;
|
||||||
|
reason?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class KubeStatus {
|
||||||
|
public readonly kind = "Status";
|
||||||
|
public readonly apiVersion: string;
|
||||||
|
public readonly code: number;
|
||||||
|
public readonly message: string;
|
||||||
|
public readonly reason: string;
|
||||||
|
|
||||||
|
constructor(data: IKubeStatus) {
|
||||||
|
this.apiVersion = data.apiVersion;
|
||||||
|
this.code = data.code;
|
||||||
|
this.message = data.message || "";
|
||||||
|
this.reason = data.reason || "";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export type IKubeMetaField = keyof IKubeObjectMetadata;
|
export type IKubeMetaField = keyof IKubeObjectMetadata;
|
||||||
|
|
||||||
@autobind()
|
@autobind()
|
||||||
|
|||||||
@ -12,7 +12,7 @@ import { KubeJsonApiData } from "./kube-json-api";
|
|||||||
import { isDebugging, isProduction } from "../../common/vars";
|
import { isDebugging, isProduction } from "../../common/vars";
|
||||||
|
|
||||||
export interface IKubeWatchEvent<T = KubeJsonApiData> {
|
export interface IKubeWatchEvent<T = KubeJsonApiData> {
|
||||||
type: "ADDED" | "MODIFIED" | "DELETED";
|
type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR";
|
||||||
object?: T;
|
object?: T;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -32,19 +32,9 @@ export interface IKubeWatchLog {
|
|||||||
@autobind()
|
@autobind()
|
||||||
export class KubeWatchApi {
|
export class KubeWatchApi {
|
||||||
@observable context: ClusterContext = null;
|
@observable context: ClusterContext = null;
|
||||||
@observable subscribers = observable.map<KubeApi, number>();
|
|
||||||
@observable isConnected = false;
|
|
||||||
|
|
||||||
contextReady = when(() => Boolean(this.context));
|
contextReady = when(() => Boolean(this.context));
|
||||||
|
|
||||||
constructor() {
|
|
||||||
this.init();
|
|
||||||
}
|
|
||||||
|
|
||||||
private async init() {
|
|
||||||
await this.contextReady;
|
|
||||||
}
|
|
||||||
|
|
||||||
isAllowedApi(api: KubeApi): boolean {
|
isAllowedApi(api: KubeApi): boolean {
|
||||||
return Boolean(this.context?.cluster.isAllowedResource(api.kind));
|
return Boolean(this.context?.cluster.isAllowedResource(api.kind));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,7 +2,7 @@ import type { ClusterContext } from "./components/context";
|
|||||||
|
|
||||||
import { action, computed, observable, reaction, when } from "mobx";
|
import { action, computed, observable, reaction, when } from "mobx";
|
||||||
import { autobind } from "./utils";
|
import { autobind } from "./utils";
|
||||||
import { KubeObject } from "./api/kube-object";
|
import { KubeObject, KubeStatus } from "./api/kube-object";
|
||||||
import { IKubeWatchEvent } from "./api/kube-watch-api";
|
import { IKubeWatchEvent } from "./api/kube-watch-api";
|
||||||
import { ItemStore } from "./item.store";
|
import { ItemStore } from "./item.store";
|
||||||
import { apiManager } from "./api/api-manager";
|
import { apiManager } from "./api/api-manager";
|
||||||
@ -149,8 +149,10 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
|
|||||||
if (merge) {
|
if (merge) {
|
||||||
this.mergeItems(items, { replace: false });
|
this.mergeItems(items, { replace: false });
|
||||||
} else {
|
} else {
|
||||||
return items;
|
this.mergeItems(items, { replace: true });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return items;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error("Loading store items failed", { error, store: this });
|
console.error("Loading store items failed", { error, store: this });
|
||||||
this.resetOnError(error);
|
this.resetOnError(error);
|
||||||
@ -176,10 +178,10 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
|
|||||||
|
|
||||||
// update existing items
|
// update existing items
|
||||||
if (!replace) {
|
if (!replace) {
|
||||||
const partialIds = partialItems.map(item => item.getId());
|
const namespaces = partialItems.map(item => item.getNs());
|
||||||
|
|
||||||
items = [
|
items = [
|
||||||
...this.items.filter(existingItem => !partialIds.includes(existingItem.getId())),
|
...this.items.filter(existingItem => !namespaces.includes(existingItem.getNs())),
|
||||||
...partialItems,
|
...partialItems,
|
||||||
];
|
];
|
||||||
}
|
}
|
||||||
@ -267,31 +269,80 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
|
|||||||
}
|
}
|
||||||
|
|
||||||
subscribe(apis = this.getSubscribeApis()) {
|
subscribe(apis = this.getSubscribeApis()) {
|
||||||
let disposers: {(): void}[] = [];
|
const abortController = new AbortController();
|
||||||
|
const namespaces = [...this.loadedNamespaces];
|
||||||
|
|
||||||
const callback = (data: IKubeWatchEvent) => {
|
if (this.context.cluster?.isGlobalWatchEnabled && namespaces.length === 0) {
|
||||||
if (!this.isLoaded) return;
|
apis.forEach(api => this.watchNamespace(api, "", abortController));
|
||||||
|
|
||||||
this.eventsBuffer.push(data);
|
|
||||||
};
|
|
||||||
|
|
||||||
if (this.context.cluster?.isGlobalWatchEnabled && this.loadedNamespaces.length === 0) {
|
|
||||||
disposers = apis.map(api => api.watch({
|
|
||||||
namespace: "",
|
|
||||||
callback: (data) => callback(data),
|
|
||||||
}));
|
|
||||||
} else {
|
} else {
|
||||||
apis.map(api => {
|
apis.forEach(api => {
|
||||||
this.loadedNamespaces.forEach((namespace) => {
|
this.loadedNamespaces.forEach((namespace) => {
|
||||||
disposers.push(api.watch({
|
this.watchNamespace(api, namespace, abortController);
|
||||||
namespace,
|
|
||||||
callback: (data) => callback(data)
|
|
||||||
}));
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
return () => disposers.forEach(dispose => dispose());
|
return () => {
|
||||||
|
abortController.abort();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private watchNamespace(api: KubeApi<T>, namespace: string, abortController: AbortController) {
|
||||||
|
let timedRetry: NodeJS.Timeout;
|
||||||
|
|
||||||
|
abortController.signal.addEventListener("abort", () => clearTimeout(timedRetry));
|
||||||
|
|
||||||
|
const callback = (data: IKubeWatchEvent, error: any) => {
|
||||||
|
if (!this.isLoaded || abortController.signal.aborted) return;
|
||||||
|
|
||||||
|
if (error instanceof Response) {
|
||||||
|
if (error.status === 404) {
|
||||||
|
// api has gone, let's not retry
|
||||||
|
return;
|
||||||
|
} else { // not sure what to do, best to retry
|
||||||
|
if (timedRetry) clearTimeout(timedRetry);
|
||||||
|
timedRetry = setTimeout(() => {
|
||||||
|
api.watch({
|
||||||
|
namespace,
|
||||||
|
abortController,
|
||||||
|
callback
|
||||||
|
});
|
||||||
|
}, 5000);
|
||||||
|
}
|
||||||
|
} else if (error instanceof KubeStatus && error.code === 410) {
|
||||||
|
if (timedRetry) clearTimeout(timedRetry);
|
||||||
|
// resourceVersion has gone, let's try to reload
|
||||||
|
timedRetry = setTimeout(() => {
|
||||||
|
(namespace === "" ? this.loadAll({ merge: false }) : this.loadAll({namespaces: [namespace]})).then(() => {
|
||||||
|
api.watch({
|
||||||
|
namespace,
|
||||||
|
abortController,
|
||||||
|
callback
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}, 1000);
|
||||||
|
} else if(error) { // not sure what to do, best to retry
|
||||||
|
if (timedRetry) clearTimeout(timedRetry);
|
||||||
|
|
||||||
|
timedRetry = setTimeout(() => {
|
||||||
|
api.watch({
|
||||||
|
namespace,
|
||||||
|
abortController,
|
||||||
|
callback
|
||||||
|
});
|
||||||
|
}, 5000);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (data) {
|
||||||
|
this.eventsBuffer.push(data);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
api.watch({
|
||||||
|
namespace,
|
||||||
|
abortController,
|
||||||
|
callback: (data, error) => callback(data, error)
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@action
|
@action
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user