diff --git a/src/renderer/api/kube-api.ts b/src/renderer/api/kube-api.ts index 0be497a8f1..97cfb0522b 100644 --- a/src/renderer/api/kube-api.ts +++ b/src/renderer/api/kube-api.ts @@ -8,7 +8,7 @@ import { apiManager } from "./api-manager"; import { apiKube } from "./index"; import { createKubeApiURL, parseKubeApi } from "./kube-api-parse"; import { KubeJsonApi, KubeJsonApiData, KubeJsonApiDataList } from "./kube-json-api"; -import { IKubeObjectConstructor, KubeObject } from "./kube-object"; +import { IKubeObjectConstructor, KubeObject, KubeStatus } from "./kube-object"; import byline from "byline"; import { ReadableWebToNodeStream } from "readable-web-to-node-stream"; import { IKubeWatchEvent } from "./kube-watch-api"; @@ -93,9 +93,11 @@ export function ensureObjectSelfLink(api: KubeApi, object: KubeJsonApiData) { } } -type KubeApiWatchOptions = { +export type KubeApiWatchCallback = (data: IKubeWatchEvent, error: any) => void; + +export type KubeApiWatchOptions = { namespace: string; - callback?: (data: IKubeWatchEvent) => void; + callback?: KubeApiWatchCallback; abortController?: AbortController }; @@ -370,8 +372,8 @@ export class KubeApi { if (!opts.abortController) { opts.abortController = new AbortController(); } + let errorReceived = false; const { abortController, namespace, callback } = opts; - const watchUrl = this.getWatchUrl(namespace); const responsePromise = this.request.getResponse(watchUrl, null, { signal: abortController.signal @@ -379,49 +381,50 @@ export class KubeApi { responsePromise.then((response) => { if (!response.ok && !abortController.signal.aborted) { - if (response.status === 410) { // resourceVersion has gone - setTimeout(() => { - this.refreshResourceVersion().then(() => { - this.watch({...opts, abortController}); - }); - }, 1000); - - } else if (response.status >= 500) { // k8s is having hard time - setTimeout(() => { - this.watch({...opts, abortController}); - }, 5000); - } + callback?.(null, response); return; } const nodeStream = new ReadableWebToNodeStream(response.body); + + nodeStream.on("end", () => { + if (errorReceived) return; // kubernetes errors should be handled in a callback + + setTimeout(() => { // we did not get any kubernetes errors so let's retry + if (abortController.signal.aborted) return; + + this.watch({...opts, namespace, callback}); + }, 1000); + }); + const stream = byline(nodeStream); stream.on("data", (line) => { try { const event: IKubeWatchEvent = JSON.parse(line); + if (event.type === "ERROR" && event.object.kind === "Status") { + errorReceived = true; + callback(null, new KubeStatus(event.object as any)); + + return; + } + this.modifyWatchEvent(event); if (callback) { - callback(event); + callback(event, null); } } catch (ignore) { // ignore parse errors } }); - - stream.on("close", () => { - setTimeout(() => { - if (!abortController.signal.aborted) this.watch({...opts, namespace, callback}); - }, 1000); - }); }, (error) => { if (error instanceof DOMException) return; // AbortController rejects, we can ignore it - console.error("watch rejected", error); + callback?.(null, error); }).catch((error) => { - console.error("watch error", error); + callback?.(null, error); }); const disposer = () => { diff --git a/src/renderer/api/kube-object.ts b/src/renderer/api/kube-object.ts index 08bd6401b9..d16d65b6da 100644 --- a/src/renderer/api/kube-object.ts +++ b/src/renderer/api/kube-object.ts @@ -40,6 +40,29 @@ export interface IKubeObjectMetadata { }[]; } +export interface IKubeStatus { + kind: string; + apiVersion: string; + code: number; + message?: string; + reason?: string; +} + +export class KubeStatus { + public readonly kind = "Status"; + public readonly apiVersion: string; + public readonly code: number; + public readonly message: string; + public readonly reason: string; + + constructor(data: IKubeStatus) { + this.apiVersion = data.apiVersion; + this.code = data.code; + this.message = data.message || ""; + this.reason = data.reason || ""; + } +} + export type IKubeMetaField = keyof IKubeObjectMetadata; @autobind() diff --git a/src/renderer/api/kube-watch-api.ts b/src/renderer/api/kube-watch-api.ts index 5523c5c5a9..f2f50193db 100644 --- a/src/renderer/api/kube-watch-api.ts +++ b/src/renderer/api/kube-watch-api.ts @@ -12,7 +12,7 @@ import { KubeJsonApiData } from "./kube-json-api"; import { isDebugging, isProduction } from "../../common/vars"; export interface IKubeWatchEvent { - type: "ADDED" | "MODIFIED" | "DELETED"; + type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR"; object?: T; } @@ -32,19 +32,9 @@ export interface IKubeWatchLog { @autobind() export class KubeWatchApi { @observable context: ClusterContext = null; - @observable subscribers = observable.map(); - @observable isConnected = false; contextReady = when(() => Boolean(this.context)); - constructor() { - this.init(); - } - - private async init() { - await this.contextReady; - } - isAllowedApi(api: KubeApi): boolean { return Boolean(this.context?.cluster.isAllowedResource(api.kind)); } diff --git a/src/renderer/kube-object.store.ts b/src/renderer/kube-object.store.ts index 9a2a3da4e3..943aae7ad0 100644 --- a/src/renderer/kube-object.store.ts +++ b/src/renderer/kube-object.store.ts @@ -2,7 +2,7 @@ import type { ClusterContext } from "./components/context"; import { action, computed, observable, reaction, when } from "mobx"; import { autobind } from "./utils"; -import { KubeObject } from "./api/kube-object"; +import { KubeObject, KubeStatus } from "./api/kube-object"; import { IKubeWatchEvent } from "./api/kube-watch-api"; import { ItemStore } from "./item.store"; import { apiManager } from "./api/api-manager"; @@ -149,8 +149,10 @@ export abstract class KubeObjectStore extends ItemSt if (merge) { this.mergeItems(items, { replace: false }); } else { - return items; + this.mergeItems(items, { replace: true }); } + + return items; } catch (error) { console.error("Loading store items failed", { error, store: this }); this.resetOnError(error); @@ -176,10 +178,10 @@ export abstract class KubeObjectStore extends ItemSt // update existing items if (!replace) { - const partialIds = partialItems.map(item => item.getId()); + const namespaces = partialItems.map(item => item.getNs()); items = [ - ...this.items.filter(existingItem => !partialIds.includes(existingItem.getId())), + ...this.items.filter(existingItem => !namespaces.includes(existingItem.getNs())), ...partialItems, ]; } @@ -267,31 +269,80 @@ export abstract class KubeObjectStore extends ItemSt } subscribe(apis = this.getSubscribeApis()) { - let disposers: {(): void}[] = []; + const abortController = new AbortController(); + const namespaces = [...this.loadedNamespaces]; - const callback = (data: IKubeWatchEvent) => { - if (!this.isLoaded) return; - - this.eventsBuffer.push(data); - }; - - if (this.context.cluster?.isGlobalWatchEnabled && this.loadedNamespaces.length === 0) { - disposers = apis.map(api => api.watch({ - namespace: "", - callback: (data) => callback(data), - })); + if (this.context.cluster?.isGlobalWatchEnabled && namespaces.length === 0) { + apis.forEach(api => this.watchNamespace(api, "", abortController)); } else { - apis.map(api => { + apis.forEach(api => { this.loadedNamespaces.forEach((namespace) => { - disposers.push(api.watch({ - namespace, - callback: (data) => callback(data) - })); + this.watchNamespace(api, namespace, abortController); }); }); } - return () => disposers.forEach(dispose => dispose()); + return () => { + abortController.abort(); + }; + } + + private watchNamespace(api: KubeApi, namespace: string, abortController: AbortController) { + let timedRetry: NodeJS.Timeout; + + abortController.signal.addEventListener("abort", () => clearTimeout(timedRetry)); + + const callback = (data: IKubeWatchEvent, error: any) => { + if (!this.isLoaded || abortController.signal.aborted) return; + + if (error instanceof Response) { + if (error.status === 404) { + // api has gone, let's not retry + return; + } else { // not sure what to do, best to retry + if (timedRetry) clearTimeout(timedRetry); + timedRetry = setTimeout(() => { + api.watch({ + namespace, + abortController, + callback + }); + }, 5000); + } + } else if (error instanceof KubeStatus && error.code === 410) { + if (timedRetry) clearTimeout(timedRetry); + // resourceVersion has gone, let's try to reload + timedRetry = setTimeout(() => { + (namespace === "" ? this.loadAll({ merge: false }) : this.loadAll({namespaces: [namespace]})).then(() => { + api.watch({ + namespace, + abortController, + callback + }); + }); + }, 1000); + } else if(error) { // not sure what to do, best to retry + if (timedRetry) clearTimeout(timedRetry); + + timedRetry = setTimeout(() => { + api.watch({ + namespace, + abortController, + callback + }); + }, 5000); + } + + if (data) { + this.eventsBuffer.push(data); + } + }; + + api.watch({ + namespace, + abortController, + callback: (data, error) => callback(data, error) + }); } @action