From 7d271a7b451a6682917b75d85db236d019751dd4 Mon Sep 17 00:00:00 2001 From: Sebastian Malton Date: Wed, 25 Jan 2023 07:30:18 -0800 Subject: [PATCH] Add more resiliancy to listing kube API resource kinds (#6995) Signed-off-by: Sebastian Malton Signed-off-by: Sebastian Malton --- packages/core/src/common/cluster-types.ts | 2 +- packages/core/src/common/cluster/cluster.ts | 41 +++++++---- .../core/src/common/utils/backoff-caller.ts | 70 +++++++++++++++++++ .../src/main/__test__/kube-auth-proxy.test.ts | 10 +-- .../request-api-resources.injectable.ts | 51 ++++++++++---- .../src/main/cluster/request-api-versions.ts | 3 +- .../request-core-api-versions.injectable.ts | 20 ++++-- ...quest-kube-api-resources-for.injectable.ts | 27 ++++--- ...equest-non-core-api-versions.injectable.ts | 24 +++++-- .../main/kube-auth-proxy/kube-auth-proxy.ts | 10 +-- .../cluster-manager/cluster-status.tsx | 18 ++--- 11 files changed, 207 insertions(+), 69 deletions(-) create mode 100644 packages/core/src/common/utils/backoff-caller.ts diff --git a/packages/core/src/common/cluster-types.ts b/packages/core/src/common/cluster-types.ts index 0cd447f0e2..3e904a385e 100644 --- a/packages/core/src/common/cluster-types.ts +++ b/packages/core/src/common/cluster-types.ts @@ -157,7 +157,7 @@ export enum ClusterStatus { */ export interface KubeAuthUpdate { message: string; - isError: boolean; + level: "info" | "warning" | "error"; } /** diff --git a/packages/core/src/common/cluster/cluster.ts b/packages/core/src/common/cluster/cluster.ts index 3f353dfee4..f00d9c3f89 100644 --- a/packages/core/src/common/cluster/cluster.ts +++ b/packages/core/src/common/cluster/cluster.ts @@ -356,7 +356,7 @@ export class Cluster implements ClusterModel { this.broadcastConnectUpdate("Starting connection ..."); await this.reconnect(); } catch (error) { - this.broadcastConnectUpdate(`Failed to start connection: ${error}`, true); + this.broadcastConnectUpdate(`Failed to start connection: ${error}`, "error"); return; } @@ -366,7 +366,7 @@ export class Cluster implements ClusterModel { this.broadcastConnectUpdate("Refreshing connection status ..."); await this.refreshConnectionStatus(); } catch (error) { - this.broadcastConnectUpdate(`Failed to connection status: ${error}`, true); + this.broadcastConnectUpdate(`Failed to connection status: ${error}`, "error"); return; } @@ -376,7 +376,7 @@ export class Cluster implements ClusterModel { this.broadcastConnectUpdate("Refreshing cluster accessibility ..."); await this.refreshAccessibility(); } catch (error) { - this.broadcastConnectUpdate(`Failed to refresh accessibility: ${error}`, true); + this.broadcastConnectUpdate(`Failed to refresh accessibility: ${error}`, "error"); return; } @@ -484,9 +484,20 @@ export class Cluster implements ClusterModel { resource: "*", }); this.allowedNamespaces.replace(await this.requestAllowedNamespaces(proxyConfig)); - this.knownResources.replace(await this.dependencies.requestApiResources(this)); + + const knownResources = await this.dependencies.requestApiResources(this); + + if (knownResources.callWasSuccessful) { + this.knownResources.replace(knownResources.response); + } else if (this.knownResources.length > 0) { + this.dependencies.logger.warn(`[CLUSTER]: failed to list KUBE resources, sticking with previous list`); + } else { + this.dependencies.logger.warn(`[CLUSTER]: failed to list KUBE resources for the first time, blocking connection to cluster...`); + this.broadcastConnectUpdate("Failed to list kube API resources, please reconnect...", "error"); + } + this.allowedResources.replace(await this.getAllowedResources(requestNamespaceListPermissions)); - this.ready = true; + this.ready = this.knownResources.length > 0; } /** @@ -536,37 +547,37 @@ export class Cluster implements ClusterModel { if (isRequestError(error)) { if (error.statusCode) { if (error.statusCode >= 400 && error.statusCode < 500) { - this.broadcastConnectUpdate("Invalid credentials", true); + this.broadcastConnectUpdate("Invalid credentials", "error"); return ClusterStatus.AccessDenied; } const message = String(error.error || error.message) || String(error); - this.broadcastConnectUpdate(message, true); + this.broadcastConnectUpdate(message, "error"); return ClusterStatus.Offline; } if (error.failed === true) { if (error.timedOut === true) { - this.broadcastConnectUpdate("Connection timed out", true); + this.broadcastConnectUpdate("Connection timed out", "error"); return ClusterStatus.Offline; } - this.broadcastConnectUpdate("Failed to fetch credentials", true); + this.broadcastConnectUpdate("Failed to fetch credentials", "error"); return ClusterStatus.AccessDenied; } const message = String(error.error || error.message) || String(error); - this.broadcastConnectUpdate(message, true); + this.broadcastConnectUpdate(message, "error"); } else if (error instanceof Error || typeof error === "string") { - this.broadcastConnectUpdate(`${error}`, true); + this.broadcastConnectUpdate(`${error}`, "error"); } else { - this.broadcastConnectUpdate("Unknown error has occurred", true); + this.broadcastConnectUpdate("Unknown error has occurred", "error"); } return ClusterStatus.Offline; @@ -636,8 +647,8 @@ export class Cluster implements ClusterModel { * broadcast an authentication update concerning this cluster * @internal */ - broadcastConnectUpdate(message: string, isError = false): void { - const update: KubeAuthUpdate = { message, isError }; + broadcastConnectUpdate(message: string, level: KubeAuthUpdate["level"] = "info"): void { + const update: KubeAuthUpdate = { message, level }; this.dependencies.logger.debug(`[CLUSTER]: broadcasting connection update`, { ...update, meta: this.getMeta() }); this.dependencies.broadcastMessage(`cluster:${this.id}:connection-update`, update); @@ -668,7 +679,7 @@ export class Cluster implements ClusterModel { } protected async getAllowedResources(requestNamespaceListPermissions: RequestNamespaceListPermissions) { - if (!this.allowedNamespaces.length) { + if (!this.allowedNamespaces.length || !this.knownResources.length) { return []; } diff --git a/packages/core/src/common/utils/backoff-caller.ts b/packages/core/src/common/utils/backoff-caller.ts new file mode 100644 index 0000000000..131565bb09 --- /dev/null +++ b/packages/core/src/common/utils/backoff-caller.ts @@ -0,0 +1,70 @@ +/** + * Copyright (c) OpenLens Authors. All rights reserved. + * Licensed under MIT License. See LICENSE in root directory for more information. + */ + +import type { AsyncResult } from "./async-result"; +import { delay } from "./delay"; +import { noop } from "./noop"; + +/** + * @param error The error that resulted in the failure + * @param attempt The 1-index attempt count + */ +export type OnIntermediateError = (error: E, attempt: number) => void; + +export interface BackoffCallerOptions { + /** + * Called when an attempt fails + */ + onIntermediateError?: OnIntermediateError; + + /** + * @default 5 + */ + maxAttempts?: number; + + /** + * In miliseconds + * @default 1000 + */ + initialTimeout?: number; + + /** + * @default 2 + */ + scaleFactor?: number; +} + +/** + * Calls `fn` once and then again (with exponential delay between each attempt) up to `options.maxAttempts` times. + * @param fn The function to repeatedly attempt + * @returns The first success or the last failure + */ +export const backoffCaller = async >(fn: () => Promise, options?: BackoffCallerOptions): Promise => { + const { + initialTimeout = 1000, + maxAttempts = 5, + onIntermediateError = noop as OnIntermediateError, + scaleFactor = 2, + } = options ?? {}; + + let timeout = initialTimeout; + let attempt = 0; + let result: R; + + do { + result = await fn(); + + if (result.callWasSuccessful) { + return result; + } + + onIntermediateError(result.error, attempt + 1); + + await delay(timeout); + timeout *= scaleFactor; + } while (attempt += 1, attempt < maxAttempts); + + return result; +}; diff --git a/packages/core/src/main/__test__/kube-auth-proxy.test.ts b/packages/core/src/main/__test__/kube-auth-proxy.test.ts index c37a7f254d..4f8bf9ba29 100644 --- a/packages/core/src/main/__test__/kube-auth-proxy.test.ts +++ b/packages/core/src/main/__test__/kube-auth-proxy.test.ts @@ -200,34 +200,34 @@ describe("kube auth proxy tests", () => { await proxy.run(); listeners.emit("error", { message: "foobarbat" }); - expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "foobarbat", isError: true }); + expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "foobarbat", level: "error" }); }); it("should call spawn and broadcast exit", async () => { await proxy.run(); listeners.emit("exit", 0); - expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "proxy exited with code: 0", isError: false }); + expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "proxy exited with code: 0", level: "info" }); }); it("should call spawn and broadcast errors from stderr", async () => { await proxy.run(); listeners.emit("stderr/data", "an error"); - expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "an error", isError: true }); + expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "an error", level: "error" }); }); it("should call spawn and broadcast stdout serving info", async () => { await proxy.run(); - expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "Authentication proxy started", isError: false }); + expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "Authentication proxy started", level: "info" }); }); it("should call spawn and broadcast stdout other info", async () => { await proxy.run(); listeners.emit("stdout/data", "some info"); - expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "some info", isError: false }); + expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "some info", level: "info" }); }); }); }); diff --git a/packages/core/src/main/cluster/request-api-resources.injectable.ts b/packages/core/src/main/cluster/request-api-resources.injectable.ts index bc996add60..552bd47800 100644 --- a/packages/core/src/main/cluster/request-api-resources.injectable.ts +++ b/packages/core/src/main/cluster/request-api-resources.injectable.ts @@ -10,8 +10,10 @@ import type { Cluster } from "../../common/cluster/cluster"; import { requestApiVersionsInjectionToken } from "./request-api-versions"; import { withConcurrencyLimit } from "../../common/utils/with-concurrency-limit"; import requestKubeApiResourcesForInjectable from "./request-kube-api-resources-for.injectable"; +import type { AsyncResult } from "../../common/utils/async-result"; +import { backoffCaller } from "../../common/utils/backoff-caller"; -export type RequestApiResources = (cluster: Cluster) => Promise; +export type RequestApiResources = (cluster: Cluster) => Promise>; export interface KubeResourceListGroup { group: string; @@ -25,23 +27,46 @@ const requestApiResourcesInjectable = getInjectable({ const apiVersionRequesters = di.injectMany(requestApiVersionsInjectionToken); const requestKubeApiResourcesFor = di.inject(requestKubeApiResourcesForInjectable); - return async (cluster) => { + return async (...args) => { + const [cluster] = args; const requestKubeApiResources = withConcurrencyLimit(5)(requestKubeApiResourcesFor(cluster)); - try { - const requests = await Promise.all(apiVersionRequesters.map(fn => fn(cluster))); - const resources = await Promise.all(( - requests - .flat() - .map(requestKubeApiResources) - )); + const groupLists: KubeResourceListGroup[] = []; - return resources.flat(); - } catch (error) { - logger.error(`[LIST-API-RESOURCES]: failed to list api resources: ${error}`); + for (const apiVersionRequester of apiVersionRequesters) { + const result = await backoffCaller(() => apiVersionRequester(cluster), { + onIntermediateError: (error, attempt) => { + cluster.broadcastConnectUpdate(`Failed to list kube API resource kinds, attempt ${attempt}: ${error}`, "warning"); + logger.warn(`[LIST-API-RESOURCES]: failed to list kube api resources: ${error}`, { attempt, clusterId: cluster.id }); + }, + }); - return []; + if (!result.callWasSuccessful) { + return result; + } + + groupLists.push(...result.response); } + + const apiResourceRequests = groupLists.map(async listGroup => ( + Object.assign(await requestKubeApiResources(listGroup), { listGroup }) + )); + const results = await Promise.all(apiResourceRequests); + const resources: KubeApiResource[] = []; + + for (const result of results) { + if (!result.callWasSuccessful) { + cluster.broadcastConnectUpdate(`Kube APIs under "${result.listGroup.path}" may not be displayed`, "warning"); + continue; + } + + resources.push(...result.response); + } + + return { + callWasSuccessful: true, + response: resources, + }; }; }, }); diff --git a/packages/core/src/main/cluster/request-api-versions.ts b/packages/core/src/main/cluster/request-api-versions.ts index af7bc7f232..be9ec2c21a 100644 --- a/packages/core/src/main/cluster/request-api-versions.ts +++ b/packages/core/src/main/cluster/request-api-versions.ts @@ -5,13 +5,14 @@ import { getInjectionToken } from "@ogre-tools/injectable"; import type { Cluster } from "../../common/cluster/cluster"; +import type { AsyncResult } from "../../common/utils/async-result"; export interface KubeResourceListGroup { group: string; path: string; } -export type RequestApiVersions = (cluster: Cluster) => Promise; +export type RequestApiVersions = (cluster: Cluster) => Promise>; export const requestApiVersionsInjectionToken = getInjectionToken({ id: "request-api-versions-token", diff --git a/packages/core/src/main/cluster/request-core-api-versions.injectable.ts b/packages/core/src/main/cluster/request-core-api-versions.injectable.ts index 4287eb4d3b..b709eb9356 100644 --- a/packages/core/src/main/cluster/request-core-api-versions.injectable.ts +++ b/packages/core/src/main/cluster/request-core-api-versions.injectable.ts @@ -13,12 +13,22 @@ const requestCoreApiVersionsInjectable = getInjectable({ const k8sRequest = di.inject(k8sRequestInjectable); return async (cluster) => { - const { versions } = await k8sRequest(cluster, "/api") as V1APIVersions; + try { + const { versions } = await k8sRequest(cluster, "/api") as V1APIVersions; - return versions.map(version => ({ - group: "", - path: `/api/${version}`, - })); + return { + callWasSuccessful: true, + response: versions.map(version => ({ + group: "", + path: `/api/${version}`, + })), + }; + } catch (error) { + return { + callWasSuccessful: false, + error: error as Error, + }; + } }; }, injectionToken: requestApiVersionsInjectionToken, diff --git a/packages/core/src/main/cluster/request-kube-api-resources-for.injectable.ts b/packages/core/src/main/cluster/request-kube-api-resources-for.injectable.ts index dfe1345db1..681f4ac013 100644 --- a/packages/core/src/main/cluster/request-kube-api-resources-for.injectable.ts +++ b/packages/core/src/main/cluster/request-kube-api-resources-for.injectable.ts @@ -6,10 +6,11 @@ import type { V1APIResourceList } from "@kubernetes/client-node"; import { getInjectable } from "@ogre-tools/injectable"; import type { Cluster } from "../../common/cluster/cluster"; import type { KubeApiResource } from "../../common/rbac"; +import type { AsyncResult } from "../../common/utils/async-result"; import k8sRequestInjectable from "../k8s-request.injectable"; import type { KubeResourceListGroup } from "./request-api-versions"; -export type RequestKubeApiResources = (grouping: KubeResourceListGroup) => Promise; +export type RequestKubeApiResources = (grouping: KubeResourceListGroup) => Promise>; export type RequestKubeApiResourcesFor = (cluster: Cluster) => RequestKubeApiResources; @@ -19,14 +20,24 @@ const requestKubeApiResourcesForInjectable = getInjectable({ const k8sRequest = di.inject(k8sRequestInjectable); return (cluster) => async ({ group, path }) => { - const { resources } = await k8sRequest(cluster, path) as V1APIResourceList; + try { + const { resources } = await k8sRequest(cluster, path) as V1APIResourceList; - return resources.map(resource => ({ - apiName: resource.name, - kind: resource.kind, - group, - namespaced: resource.namespaced, - })); + return { + callWasSuccessful: true, + response: resources.map(resource => ({ + apiName: resource.name, + kind: resource.kind, + group, + namespaced: resource.namespaced, + })), + }; + } catch (error) { + return { + callWasSuccessful: false, + error: error as Error, + }; + } }; }, }); diff --git a/packages/core/src/main/cluster/request-non-core-api-versions.injectable.ts b/packages/core/src/main/cluster/request-non-core-api-versions.injectable.ts index 5ca9e1495b..e5b241c75f 100644 --- a/packages/core/src/main/cluster/request-non-core-api-versions.injectable.ts +++ b/packages/core/src/main/cluster/request-non-core-api-versions.injectable.ts @@ -14,14 +14,24 @@ const requestNonCoreApiVersionsInjectable = getInjectable({ const k8sRequest = di.inject(k8sRequestInjectable); return async (cluster) => { - const { groups } = await k8sRequest(cluster, "/apis") as V1APIGroupList; + try { + const { groups } = await k8sRequest(cluster, "/apis") as V1APIGroupList; - return chain(groups.values()) - .filterMap(group => group.preferredVersion?.groupVersion && ({ - group: group.name, - path: `/apis/${group.preferredVersion.groupVersion}`, - })) - .collect(v => [...v]); + return { + callWasSuccessful: true, + response: chain(groups.values()) + .filterMap(group => group.preferredVersion?.groupVersion && ({ + group: group.name, + path: `/apis/${group.preferredVersion.groupVersion}`, + })) + .collect(v => [...v]), + }; + } catch (error) { + return { + callWasSuccessful: false, + error: error as Error, + }; + } }; }, injectionToken: requestApiVersionsInjectionToken, diff --git a/packages/core/src/main/kube-auth-proxy/kube-auth-proxy.ts b/packages/core/src/main/kube-auth-proxy/kube-auth-proxy.ts index 1d669f28c9..30e84988db 100644 --- a/packages/core/src/main/kube-auth-proxy/kube-auth-proxy.ts +++ b/packages/core/src/main/kube-auth-proxy/kube-auth-proxy.ts @@ -71,17 +71,17 @@ export class KubeAuthProxy { }, }); this.proxyProcess.on("error", (error) => { - this.cluster.broadcastConnectUpdate(error.message, true); + this.cluster.broadcastConnectUpdate(error.message, "error"); this.exit(); }); this.proxyProcess.on("exit", (code) => { - this.cluster.broadcastConnectUpdate(`proxy exited with code: ${code}`, code ? code > 0: false); + this.cluster.broadcastConnectUpdate(`proxy exited with code: ${code}`, code ? "error" : "info"); this.exit(); }); this.proxyProcess.on("disconnect", () => { - this.cluster.broadcastConnectUpdate("Proxy disconnected communications", true ); + this.cluster.broadcastConnectUpdate("Proxy disconnected communications", "error"); this.exit(); }); @@ -93,7 +93,7 @@ export class KubeAuthProxy { return; } - this.cluster.broadcastConnectUpdate(data.toString(), true); + this.cluster.broadcastConnectUpdate(data.toString(), "error"); }); this.proxyProcess.stdout.on("data", (data: Buffer) => { @@ -114,7 +114,7 @@ export class KubeAuthProxy { this.ready = true; } catch (error) { this.dependencies.logger.warn("[KUBE-AUTH-PROXY]: waitUntilUsed failed", error); - this.cluster.broadcastConnectUpdate("Proxy port failed to be used within timelimit, restarting...", true); + this.cluster.broadcastConnectUpdate("Proxy port failed to be used within timelimit, restarting...", "error"); this.exit(); return this.run(); diff --git a/packages/core/src/renderer/components/cluster-manager/cluster-status.tsx b/packages/core/src/renderer/components/cluster-manager/cluster-status.tsx index 1183b4f52b..940218ecad 100644 --- a/packages/core/src/renderer/components/cluster-manager/cluster-status.tsx +++ b/packages/core/src/renderer/components/cluster-manager/cluster-status.tsx @@ -11,7 +11,7 @@ import React from "react"; import { ipcRendererOn } from "../../../common/ipc"; import type { Cluster } from "../../../common/cluster/cluster"; import type { IClassName } from "../../utils"; -import { isBoolean, hasTypedProperty, isObject, isString, cssNames } from "../../utils"; +import { hasTypedProperty, isObject, isString, cssNames } from "../../utils"; import { Button } from "../button"; import { Icon } from "../icon"; import { Spinner } from "../spinner"; @@ -51,8 +51,8 @@ class NonInjectedClusterStatus extends React.Component isError); + @computed get hasErrorsOrWarnings(): boolean { + return this.authOutput.some(({ level }) => level !== "info"); } componentDidMount() { @@ -61,7 +61,7 @@ class NonInjectedClusterStatus extends React.Component { - this.authOutput.map(({ message, isError }, index) => ( -

+ this.authOutput.map(({ message, level }, index) => ( +

{message.trim()}

)) @@ -113,7 +113,7 @@ class NonInjectedClusterStatus extends React.Component; } @@ -131,7 +131,7 @@ class NonInjectedClusterStatus extends React.Component