1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

improve kube watch error handling

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>
This commit is contained in:
Jari Kolehmainen 2021-02-10 14:11:25 +02:00
parent 2b22ec0aa3
commit 0453a0f545
4 changed files with 118 additions and 48 deletions

View File

@ -8,7 +8,7 @@ import { apiManager } from "./api-manager";
import { apiKube } from "./index";
import { createKubeApiURL, parseKubeApi } from "./kube-api-parse";
import { KubeJsonApi, KubeJsonApiData, KubeJsonApiDataList } from "./kube-json-api";
import { IKubeObjectConstructor, KubeObject } from "./kube-object";
import { IKubeObjectConstructor, KubeObject, KubeStatus } from "./kube-object";
import byline from "byline";
import { ReadableWebToNodeStream } from "readable-web-to-node-stream";
import { IKubeWatchEvent } from "./kube-watch-api";
@ -93,9 +93,11 @@ export function ensureObjectSelfLink(api: KubeApi, object: KubeJsonApiData) {
}
}
type KubeApiWatchOptions = {
export type KubeApiWatchCallback = (data: IKubeWatchEvent, error: any) => void;
export type KubeApiWatchOptions = {
namespace: string;
callback?: (data: IKubeWatchEvent) => void;
callback?: KubeApiWatchCallback;
abortController?: AbortController
};
@ -370,8 +372,8 @@ export class KubeApi<T extends KubeObject = any> {
if (!opts.abortController) {
opts.abortController = new AbortController();
}
let errorReceived = false;
const { abortController, namespace, callback } = opts;
const watchUrl = this.getWatchUrl(namespace);
const responsePromise = this.request.getResponse(watchUrl, null, {
signal: abortController.signal
@ -379,53 +381,63 @@ export class KubeApi<T extends KubeObject = any> {
responsePromise.then((response) => {
if (!response.ok && !abortController.signal.aborted) {
if (response.status === 410) { // resourceVersion has gone
setTimeout(() => {
this.refreshResourceVersion().then(() => {
this.watch({...opts, abortController});
});
}, 1000);
} else if (response.status >= 500) { // k8s is having hard time
setTimeout(() => {
this.watch({...opts, abortController});
}, 5000);
if (callback) {
callback(null, response);
}
return;
}
const nodeStream = new ReadableWebToNodeStream(response.body);
nodeStream.on("end", () => {
if (errorReceived) return; // kubernetes errors should be handled in a callback
setTimeout(() => { // we did not get any kubernetes errors so let's retry
if (abortController.signal.aborted) return;
this.watch({...opts, namespace, callback});
}, 1000);
});
const stream = byline(nodeStream);
stream.on("data", (line) => {
try {
const event: IKubeWatchEvent = JSON.parse(line);
if (event.type === "ERROR" && event.object.kind === "Status") {
console.error(event);
errorReceived = true;
callback(null, new KubeStatus(event.object as any));
return;
}
this.modifyWatchEvent(event);
if (callback) {
callback(event);
callback(event, null);
}
} catch (ignore) {
// ignore parse errors
}
});
stream.on("close", () => {
setTimeout(() => {
if (!abortController.signal.aborted) this.watch({...opts, namespace, callback});
}, 1000);
});
}, (error) => {
if (error instanceof DOMException) return; // AbortController rejects, we can ignore it
console.error("watch rejected", error);
if (callback) {
callback(null, error);
}
}).catch((error) => {
console.error("watch error", error);
if (callback) {
callback(null, error);
}
});
const disposer = () => {
abortController.abort();
if (!abortController.signal.aborted) {
abortController.abort();
}
};
return disposer;

View File

@ -40,6 +40,26 @@ export interface IKubeObjectMetadata {
}[];
}
export interface IKubeStatus {
kind: string;
apiVersion: string;
code: number;
message?: string;
reason?: string;
}
export class KubeStatus implements IKubeStatus {
public readonly kind: string;
public readonly apiVersion: string;
public readonly code: number;
public readonly message: string;
public readonly reason: string;
constructor(data: IKubeStatus) {
Object.assign(this, data);
}
}
export type IKubeMetaField = keyof IKubeObjectMetadata;
@autobind()

View File

@ -12,7 +12,7 @@ import { KubeJsonApiData } from "./kube-json-api";
import { isDebugging, isProduction } from "../../common/vars";
export interface IKubeWatchEvent<T = KubeJsonApiData> {
type: "ADDED" | "MODIFIED" | "DELETED";
type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR";
object?: T;
}
@ -32,8 +32,6 @@ export interface IKubeWatchLog {
@autobind()
export class KubeWatchApi {
@observable context: ClusterContext = null;
@observable subscribers = observable.map<KubeApi, number>();
@observable isConnected = false;
contextReady = when(() => Boolean(this.context));

View File

@ -2,7 +2,7 @@ import type { ClusterContext } from "./components/context";
import { action, computed, observable, reaction, when } from "mobx";
import { autobind } from "./utils";
import { KubeObject } from "./api/kube-object";
import { KubeObject, KubeStatus } from "./api/kube-object";
import { IKubeWatchEvent } from "./api/kube-watch-api";
import { ItemStore } from "./item.store";
import { apiManager } from "./api/api-manager";
@ -149,8 +149,10 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
if (merge) {
this.mergeItems(items, { replace: false });
} else {
return items;
this.mergeItems(items, { replace: true });
}
return items;
} catch (error) {
console.error("Loading store items failed", { error, store: this });
this.resetOnError(error);
@ -267,31 +269,69 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
}
subscribe(apis = this.getSubscribeApis()) {
let disposers: {(): void}[] = [];
const abortController = new AbortController();
const namespaces = [...this.loadedNamespaces];
const callback = (data: IKubeWatchEvent) => {
if (!this.isLoaded) return;
this.eventsBuffer.push(data);
};
if (this.context.cluster?.isGlobalWatchEnabled && this.loadedNamespaces.length === 0) {
disposers = apis.map(api => api.watch({
namespace: "",
callback: (data) => callback(data),
}));
if (this.context.cluster?.isGlobalWatchEnabled && namespaces.length === 0) {
apis.forEach(api => this.watchNamespace(api, "", abortController));
} else {
apis.map(api => {
apis.forEach(api => {
this.loadedNamespaces.forEach((namespace) => {
disposers.push(api.watch({
namespace,
callback: (data) => callback(data)
}));
this.watchNamespace(api, namespace, abortController);
});
});
}
return () => disposers.forEach(dispose => dispose());
return () => {
abortController.abort();
};
}
private watchNamespace(api: KubeApi<T>, namespace: string, abortController: AbortController) {
const callback = (data: IKubeWatchEvent, error: any) => {
if (!this.isLoaded || abortController.signal.aborted) return;
if (error instanceof Response) {
if (error.status === 404) {
// api has gone, let's not retry
return;
} else { // not sure what to do, best to retry
setTimeout(() => {
if (abortController.signal.aborted) return;
api.watch({
namespace,
abortController,
callback
});
}, 5000);
}
} else if (error instanceof KubeStatus && error.code === 410) {
// resourceVersion has gone, let's try to reload
setTimeout(() => {
if (abortController.signal.aborted) return;
(namespace === "" ? this.loadAll({ merge: false }) : this.loadAll({namespaces: [namespace]})).then(() => {
api.watch({
namespace,
abortController,
callback
});
});
}, 1000);
}
if (data) {
this.eventsBuffer.push(data);
}
};
api.watch({
namespace,
abortController,
callback: (data, error) => callback(data, error)
});
}
@action