mirror of
https://github.com/lensapp/lens.git
synced 2025-05-20 05:10:56 +00:00
Improve kube watch error handling (#2112)
* improve kube watch error handling Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com> * cleanup Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com> * review fixes Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com> * no more object.assign Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com> * keep only items from non-loaded namespaces Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com> * cleanup Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com> * fix sleep/resume Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com> * fallback retry only on error Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>
This commit is contained in:
parent
b873ba76bd
commit
d7d4be7375
@ -8,7 +8,7 @@ import { apiManager } from "./api-manager";
|
||||
import { apiKube } from "./index";
|
||||
import { createKubeApiURL, parseKubeApi } from "./kube-api-parse";
|
||||
import { KubeJsonApi, KubeJsonApiData, KubeJsonApiDataList } from "./kube-json-api";
|
||||
import { IKubeObjectConstructor, KubeObject } from "./kube-object";
|
||||
import { IKubeObjectConstructor, KubeObject, KubeStatus } from "./kube-object";
|
||||
import byline from "byline";
|
||||
import { ReadableWebToNodeStream } from "readable-web-to-node-stream";
|
||||
import { IKubeWatchEvent } from "./kube-watch-api";
|
||||
@ -93,9 +93,11 @@ export function ensureObjectSelfLink(api: KubeApi, object: KubeJsonApiData) {
|
||||
}
|
||||
}
|
||||
|
||||
type KubeApiWatchOptions = {
|
||||
export type KubeApiWatchCallback = (data: IKubeWatchEvent, error: any) => void;
|
||||
|
||||
export type KubeApiWatchOptions = {
|
||||
namespace: string;
|
||||
callback?: (data: IKubeWatchEvent) => void;
|
||||
callback?: KubeApiWatchCallback;
|
||||
abortController?: AbortController
|
||||
};
|
||||
|
||||
@ -370,8 +372,8 @@ export class KubeApi<T extends KubeObject = any> {
|
||||
if (!opts.abortController) {
|
||||
opts.abortController = new AbortController();
|
||||
}
|
||||
let errorReceived = false;
|
||||
const { abortController, namespace, callback } = opts;
|
||||
|
||||
const watchUrl = this.getWatchUrl(namespace);
|
||||
const responsePromise = this.request.getResponse(watchUrl, null, {
|
||||
signal: abortController.signal
|
||||
@ -379,49 +381,50 @@ export class KubeApi<T extends KubeObject = any> {
|
||||
|
||||
responsePromise.then((response) => {
|
||||
if (!response.ok && !abortController.signal.aborted) {
|
||||
if (response.status === 410) { // resourceVersion has gone
|
||||
setTimeout(() => {
|
||||
this.refreshResourceVersion().then(() => {
|
||||
this.watch({...opts, abortController});
|
||||
});
|
||||
}, 1000);
|
||||
|
||||
} else if (response.status >= 500) { // k8s is having hard time
|
||||
setTimeout(() => {
|
||||
this.watch({...opts, abortController});
|
||||
}, 5000);
|
||||
}
|
||||
callback?.(null, response);
|
||||
|
||||
return;
|
||||
}
|
||||
const nodeStream = new ReadableWebToNodeStream(response.body);
|
||||
|
||||
nodeStream.on("end", () => {
|
||||
if (errorReceived) return; // kubernetes errors should be handled in a callback
|
||||
|
||||
setTimeout(() => { // we did not get any kubernetes errors so let's retry
|
||||
if (abortController.signal.aborted) return;
|
||||
|
||||
this.watch({...opts, namespace, callback});
|
||||
}, 1000);
|
||||
});
|
||||
|
||||
const stream = byline(nodeStream);
|
||||
|
||||
stream.on("data", (line) => {
|
||||
try {
|
||||
const event: IKubeWatchEvent = JSON.parse(line);
|
||||
|
||||
if (event.type === "ERROR" && event.object.kind === "Status") {
|
||||
errorReceived = true;
|
||||
callback(null, new KubeStatus(event.object as any));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
this.modifyWatchEvent(event);
|
||||
|
||||
if (callback) {
|
||||
callback(event);
|
||||
callback(event, null);
|
||||
}
|
||||
} catch (ignore) {
|
||||
// ignore parse errors
|
||||
}
|
||||
});
|
||||
|
||||
stream.on("close", () => {
|
||||
setTimeout(() => {
|
||||
if (!abortController.signal.aborted) this.watch({...opts, namespace, callback});
|
||||
}, 1000);
|
||||
});
|
||||
}, (error) => {
|
||||
if (error instanceof DOMException) return; // AbortController rejects, we can ignore it
|
||||
|
||||
console.error("watch rejected", error);
|
||||
callback?.(null, error);
|
||||
}).catch((error) => {
|
||||
console.error("watch error", error);
|
||||
callback?.(null, error);
|
||||
});
|
||||
|
||||
const disposer = () => {
|
||||
|
||||
@ -40,6 +40,29 @@ export interface IKubeObjectMetadata {
|
||||
}[];
|
||||
}
|
||||
|
||||
export interface IKubeStatus {
|
||||
kind: string;
|
||||
apiVersion: string;
|
||||
code: number;
|
||||
message?: string;
|
||||
reason?: string;
|
||||
}
|
||||
|
||||
export class KubeStatus {
|
||||
public readonly kind = "Status";
|
||||
public readonly apiVersion: string;
|
||||
public readonly code: number;
|
||||
public readonly message: string;
|
||||
public readonly reason: string;
|
||||
|
||||
constructor(data: IKubeStatus) {
|
||||
this.apiVersion = data.apiVersion;
|
||||
this.code = data.code;
|
||||
this.message = data.message || "";
|
||||
this.reason = data.reason || "";
|
||||
}
|
||||
}
|
||||
|
||||
export type IKubeMetaField = keyof IKubeObjectMetadata;
|
||||
|
||||
@autobind()
|
||||
|
||||
@ -12,7 +12,7 @@ import { KubeJsonApiData } from "./kube-json-api";
|
||||
import { isDebugging, isProduction } from "../../common/vars";
|
||||
|
||||
export interface IKubeWatchEvent<T = KubeJsonApiData> {
|
||||
type: "ADDED" | "MODIFIED" | "DELETED";
|
||||
type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR";
|
||||
object?: T;
|
||||
}
|
||||
|
||||
@ -32,19 +32,9 @@ export interface IKubeWatchLog {
|
||||
@autobind()
|
||||
export class KubeWatchApi {
|
||||
@observable context: ClusterContext = null;
|
||||
@observable subscribers = observable.map<KubeApi, number>();
|
||||
@observable isConnected = false;
|
||||
|
||||
contextReady = when(() => Boolean(this.context));
|
||||
|
||||
constructor() {
|
||||
this.init();
|
||||
}
|
||||
|
||||
private async init() {
|
||||
await this.contextReady;
|
||||
}
|
||||
|
||||
isAllowedApi(api: KubeApi): boolean {
|
||||
return Boolean(this.context?.cluster.isAllowedResource(api.kind));
|
||||
}
|
||||
|
||||
@ -2,7 +2,7 @@ import type { ClusterContext } from "./components/context";
|
||||
|
||||
import { action, computed, observable, reaction, when } from "mobx";
|
||||
import { autobind } from "./utils";
|
||||
import { KubeObject } from "./api/kube-object";
|
||||
import { KubeObject, KubeStatus } from "./api/kube-object";
|
||||
import { IKubeWatchEvent } from "./api/kube-watch-api";
|
||||
import { ItemStore } from "./item.store";
|
||||
import { apiManager } from "./api/api-manager";
|
||||
@ -149,8 +149,10 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
|
||||
if (merge) {
|
||||
this.mergeItems(items, { replace: false });
|
||||
} else {
|
||||
return items;
|
||||
this.mergeItems(items, { replace: true });
|
||||
}
|
||||
|
||||
return items;
|
||||
} catch (error) {
|
||||
console.error("Loading store items failed", { error, store: this });
|
||||
this.resetOnError(error);
|
||||
@ -176,10 +178,10 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
|
||||
|
||||
// update existing items
|
||||
if (!replace) {
|
||||
const partialIds = partialItems.map(item => item.getId());
|
||||
const namespaces = partialItems.map(item => item.getNs());
|
||||
|
||||
items = [
|
||||
...this.items.filter(existingItem => !partialIds.includes(existingItem.getId())),
|
||||
...this.items.filter(existingItem => !namespaces.includes(existingItem.getNs())),
|
||||
...partialItems,
|
||||
];
|
||||
}
|
||||
@ -267,31 +269,80 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
|
||||
}
|
||||
|
||||
subscribe(apis = this.getSubscribeApis()) {
|
||||
let disposers: {(): void}[] = [];
|
||||
const abortController = new AbortController();
|
||||
const namespaces = [...this.loadedNamespaces];
|
||||
|
||||
const callback = (data: IKubeWatchEvent) => {
|
||||
if (!this.isLoaded) return;
|
||||
|
||||
this.eventsBuffer.push(data);
|
||||
};
|
||||
|
||||
if (this.context.cluster?.isGlobalWatchEnabled && this.loadedNamespaces.length === 0) {
|
||||
disposers = apis.map(api => api.watch({
|
||||
namespace: "",
|
||||
callback: (data) => callback(data),
|
||||
}));
|
||||
if (this.context.cluster?.isGlobalWatchEnabled && namespaces.length === 0) {
|
||||
apis.forEach(api => this.watchNamespace(api, "", abortController));
|
||||
} else {
|
||||
apis.map(api => {
|
||||
apis.forEach(api => {
|
||||
this.loadedNamespaces.forEach((namespace) => {
|
||||
disposers.push(api.watch({
|
||||
namespace,
|
||||
callback: (data) => callback(data)
|
||||
}));
|
||||
this.watchNamespace(api, namespace, abortController);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
return () => disposers.forEach(dispose => dispose());
|
||||
return () => {
|
||||
abortController.abort();
|
||||
};
|
||||
}
|
||||
|
||||
private watchNamespace(api: KubeApi<T>, namespace: string, abortController: AbortController) {
|
||||
let timedRetry: NodeJS.Timeout;
|
||||
|
||||
abortController.signal.addEventListener("abort", () => clearTimeout(timedRetry));
|
||||
|
||||
const callback = (data: IKubeWatchEvent, error: any) => {
|
||||
if (!this.isLoaded || abortController.signal.aborted) return;
|
||||
|
||||
if (error instanceof Response) {
|
||||
if (error.status === 404) {
|
||||
// api has gone, let's not retry
|
||||
return;
|
||||
} else { // not sure what to do, best to retry
|
||||
if (timedRetry) clearTimeout(timedRetry);
|
||||
timedRetry = setTimeout(() => {
|
||||
api.watch({
|
||||
namespace,
|
||||
abortController,
|
||||
callback
|
||||
});
|
||||
}, 5000);
|
||||
}
|
||||
} else if (error instanceof KubeStatus && error.code === 410) {
|
||||
if (timedRetry) clearTimeout(timedRetry);
|
||||
// resourceVersion has gone, let's try to reload
|
||||
timedRetry = setTimeout(() => {
|
||||
(namespace === "" ? this.loadAll({ merge: false }) : this.loadAll({namespaces: [namespace]})).then(() => {
|
||||
api.watch({
|
||||
namespace,
|
||||
abortController,
|
||||
callback
|
||||
});
|
||||
});
|
||||
}, 1000);
|
||||
} else if(error) { // not sure what to do, best to retry
|
||||
if (timedRetry) clearTimeout(timedRetry);
|
||||
|
||||
timedRetry = setTimeout(() => {
|
||||
api.watch({
|
||||
namespace,
|
||||
abortController,
|
||||
callback
|
||||
});
|
||||
}, 5000);
|
||||
}
|
||||
|
||||
if (data) {
|
||||
this.eventsBuffer.push(data);
|
||||
}
|
||||
};
|
||||
|
||||
api.watch({
|
||||
namespace,
|
||||
abortController,
|
||||
callback: (data, error) => callback(data, error)
|
||||
});
|
||||
}
|
||||
|
||||
@action
|
||||
|
||||
Loading…
Reference in New Issue
Block a user