1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

Improve kube watch error handling (#2112)

* improve kube watch error handling

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* cleanup

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* review fixes

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* no more object.assign

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* keep only items from non-loaded namespaces

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* cleanup

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* fix sleep/resume

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>

* fallback retry only on error

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>
This commit is contained in:
Jari Kolehmainen 2021-02-11 17:34:14 +02:00 committed by GitHub
parent b873ba76bd
commit d7d4be7375
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 125 additions and 58 deletions

View File

@ -8,7 +8,7 @@ import { apiManager } from "./api-manager";
import { apiKube } from "./index";
import { createKubeApiURL, parseKubeApi } from "./kube-api-parse";
import { KubeJsonApi, KubeJsonApiData, KubeJsonApiDataList } from "./kube-json-api";
import { IKubeObjectConstructor, KubeObject } from "./kube-object";
import { IKubeObjectConstructor, KubeObject, KubeStatus } from "./kube-object";
import byline from "byline";
import { ReadableWebToNodeStream } from "readable-web-to-node-stream";
import { IKubeWatchEvent } from "./kube-watch-api";
@ -93,9 +93,11 @@ export function ensureObjectSelfLink(api: KubeApi, object: KubeJsonApiData) {
}
}
type KubeApiWatchOptions = {
export type KubeApiWatchCallback = (data: IKubeWatchEvent, error: any) => void;
export type KubeApiWatchOptions = {
namespace: string;
callback?: (data: IKubeWatchEvent) => void;
callback?: KubeApiWatchCallback;
abortController?: AbortController
};
@ -370,8 +372,8 @@ export class KubeApi<T extends KubeObject = any> {
if (!opts.abortController) {
opts.abortController = new AbortController();
}
let errorReceived = false;
const { abortController, namespace, callback } = opts;
const watchUrl = this.getWatchUrl(namespace);
const responsePromise = this.request.getResponse(watchUrl, null, {
signal: abortController.signal
@ -379,49 +381,50 @@ export class KubeApi<T extends KubeObject = any> {
responsePromise.then((response) => {
if (!response.ok && !abortController.signal.aborted) {
if (response.status === 410) { // resourceVersion has gone
setTimeout(() => {
this.refreshResourceVersion().then(() => {
this.watch({...opts, abortController});
});
}, 1000);
} else if (response.status >= 500) { // k8s is having hard time
setTimeout(() => {
this.watch({...opts, abortController});
}, 5000);
}
callback?.(null, response);
return;
}
const nodeStream = new ReadableWebToNodeStream(response.body);
nodeStream.on("end", () => {
if (errorReceived) return; // kubernetes errors should be handled in a callback
setTimeout(() => { // we did not get any kubernetes errors so let's retry
if (abortController.signal.aborted) return;
this.watch({...opts, namespace, callback});
}, 1000);
});
const stream = byline(nodeStream);
stream.on("data", (line) => {
try {
const event: IKubeWatchEvent = JSON.parse(line);
if (event.type === "ERROR" && event.object.kind === "Status") {
errorReceived = true;
callback(null, new KubeStatus(event.object as any));
return;
}
this.modifyWatchEvent(event);
if (callback) {
callback(event);
callback(event, null);
}
} catch (ignore) {
// ignore parse errors
}
});
stream.on("close", () => {
setTimeout(() => {
if (!abortController.signal.aborted) this.watch({...opts, namespace, callback});
}, 1000);
});
}, (error) => {
if (error instanceof DOMException) return; // AbortController rejects, we can ignore it
console.error("watch rejected", error);
callback?.(null, error);
}).catch((error) => {
console.error("watch error", error);
callback?.(null, error);
});
const disposer = () => {

View File

@ -40,6 +40,29 @@ export interface IKubeObjectMetadata {
}[];
}
export interface IKubeStatus {
kind: string;
apiVersion: string;
code: number;
message?: string;
reason?: string;
}
export class KubeStatus {
public readonly kind = "Status";
public readonly apiVersion: string;
public readonly code: number;
public readonly message: string;
public readonly reason: string;
constructor(data: IKubeStatus) {
this.apiVersion = data.apiVersion;
this.code = data.code;
this.message = data.message || "";
this.reason = data.reason || "";
}
}
export type IKubeMetaField = keyof IKubeObjectMetadata;
@autobind()

View File

@ -12,7 +12,7 @@ import { KubeJsonApiData } from "./kube-json-api";
import { isDebugging, isProduction } from "../../common/vars";
export interface IKubeWatchEvent<T = KubeJsonApiData> {
type: "ADDED" | "MODIFIED" | "DELETED";
type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR";
object?: T;
}
@ -32,19 +32,9 @@ export interface IKubeWatchLog {
@autobind()
export class KubeWatchApi {
@observable context: ClusterContext = null;
@observable subscribers = observable.map<KubeApi, number>();
@observable isConnected = false;
contextReady = when(() => Boolean(this.context));
constructor() {
this.init();
}
private async init() {
await this.contextReady;
}
isAllowedApi(api: KubeApi): boolean {
return Boolean(this.context?.cluster.isAllowedResource(api.kind));
}

View File

@ -2,7 +2,7 @@ import type { ClusterContext } from "./components/context";
import { action, computed, observable, reaction, when } from "mobx";
import { autobind } from "./utils";
import { KubeObject } from "./api/kube-object";
import { KubeObject, KubeStatus } from "./api/kube-object";
import { IKubeWatchEvent } from "./api/kube-watch-api";
import { ItemStore } from "./item.store";
import { apiManager } from "./api/api-manager";
@ -149,8 +149,10 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
if (merge) {
this.mergeItems(items, { replace: false });
} else {
return items;
this.mergeItems(items, { replace: true });
}
return items;
} catch (error) {
console.error("Loading store items failed", { error, store: this });
this.resetOnError(error);
@ -176,10 +178,10 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
// update existing items
if (!replace) {
const partialIds = partialItems.map(item => item.getId());
const namespaces = partialItems.map(item => item.getNs());
items = [
...this.items.filter(existingItem => !partialIds.includes(existingItem.getId())),
...this.items.filter(existingItem => !namespaces.includes(existingItem.getNs())),
...partialItems,
];
}
@ -267,31 +269,80 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
}
subscribe(apis = this.getSubscribeApis()) {
let disposers: {(): void}[] = [];
const abortController = new AbortController();
const namespaces = [...this.loadedNamespaces];
const callback = (data: IKubeWatchEvent) => {
if (!this.isLoaded) return;
this.eventsBuffer.push(data);
};
if (this.context.cluster?.isGlobalWatchEnabled && this.loadedNamespaces.length === 0) {
disposers = apis.map(api => api.watch({
namespace: "",
callback: (data) => callback(data),
}));
if (this.context.cluster?.isGlobalWatchEnabled && namespaces.length === 0) {
apis.forEach(api => this.watchNamespace(api, "", abortController));
} else {
apis.map(api => {
apis.forEach(api => {
this.loadedNamespaces.forEach((namespace) => {
disposers.push(api.watch({
namespace,
callback: (data) => callback(data)
}));
this.watchNamespace(api, namespace, abortController);
});
});
}
return () => disposers.forEach(dispose => dispose());
return () => {
abortController.abort();
};
}
private watchNamespace(api: KubeApi<T>, namespace: string, abortController: AbortController) {
let timedRetry: NodeJS.Timeout;
abortController.signal.addEventListener("abort", () => clearTimeout(timedRetry));
const callback = (data: IKubeWatchEvent, error: any) => {
if (!this.isLoaded || abortController.signal.aborted) return;
if (error instanceof Response) {
if (error.status === 404) {
// api has gone, let's not retry
return;
} else { // not sure what to do, best to retry
if (timedRetry) clearTimeout(timedRetry);
timedRetry = setTimeout(() => {
api.watch({
namespace,
abortController,
callback
});
}, 5000);
}
} else if (error instanceof KubeStatus && error.code === 410) {
if (timedRetry) clearTimeout(timedRetry);
// resourceVersion has gone, let's try to reload
timedRetry = setTimeout(() => {
(namespace === "" ? this.loadAll({ merge: false }) : this.loadAll({namespaces: [namespace]})).then(() => {
api.watch({
namespace,
abortController,
callback
});
});
}, 1000);
} else if(error) { // not sure what to do, best to retry
if (timedRetry) clearTimeout(timedRetry);
timedRetry = setTimeout(() => {
api.watch({
namespace,
abortController,
callback
});
}, 5000);
}
if (data) {
this.eventsBuffer.push(data);
}
};
api.watch({
namespace,
abortController,
callback: (data, error) => callback(data, error)
});
}
@action