1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

Add more resiliancy to listing kube API resource kinds (#6995)

Signed-off-by: Sebastian Malton <sebastian@malton.name>

Signed-off-by: Sebastian Malton <sebastian@malton.name>
This commit is contained in:
Sebastian Malton 2023-01-25 07:30:18 -08:00 committed by GitHub
parent 7e8ae3fded
commit 7d271a7b45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 207 additions and 69 deletions

View File

@ -157,7 +157,7 @@ export enum ClusterStatus {
*/
export interface KubeAuthUpdate {
message: string;
isError: boolean;
level: "info" | "warning" | "error";
}
/**

View File

@ -356,7 +356,7 @@ export class Cluster implements ClusterModel {
this.broadcastConnectUpdate("Starting connection ...");
await this.reconnect();
} catch (error) {
this.broadcastConnectUpdate(`Failed to start connection: ${error}`, true);
this.broadcastConnectUpdate(`Failed to start connection: ${error}`, "error");
return;
}
@ -366,7 +366,7 @@ export class Cluster implements ClusterModel {
this.broadcastConnectUpdate("Refreshing connection status ...");
await this.refreshConnectionStatus();
} catch (error) {
this.broadcastConnectUpdate(`Failed to connection status: ${error}`, true);
this.broadcastConnectUpdate(`Failed to connection status: ${error}`, "error");
return;
}
@ -376,7 +376,7 @@ export class Cluster implements ClusterModel {
this.broadcastConnectUpdate("Refreshing cluster accessibility ...");
await this.refreshAccessibility();
} catch (error) {
this.broadcastConnectUpdate(`Failed to refresh accessibility: ${error}`, true);
this.broadcastConnectUpdate(`Failed to refresh accessibility: ${error}`, "error");
return;
}
@ -484,9 +484,20 @@ export class Cluster implements ClusterModel {
resource: "*",
});
this.allowedNamespaces.replace(await this.requestAllowedNamespaces(proxyConfig));
this.knownResources.replace(await this.dependencies.requestApiResources(this));
const knownResources = await this.dependencies.requestApiResources(this);
if (knownResources.callWasSuccessful) {
this.knownResources.replace(knownResources.response);
} else if (this.knownResources.length > 0) {
this.dependencies.logger.warn(`[CLUSTER]: failed to list KUBE resources, sticking with previous list`);
} else {
this.dependencies.logger.warn(`[CLUSTER]: failed to list KUBE resources for the first time, blocking connection to cluster...`);
this.broadcastConnectUpdate("Failed to list kube API resources, please reconnect...", "error");
}
this.allowedResources.replace(await this.getAllowedResources(requestNamespaceListPermissions));
this.ready = true;
this.ready = this.knownResources.length > 0;
}
/**
@ -536,37 +547,37 @@ export class Cluster implements ClusterModel {
if (isRequestError(error)) {
if (error.statusCode) {
if (error.statusCode >= 400 && error.statusCode < 500) {
this.broadcastConnectUpdate("Invalid credentials", true);
this.broadcastConnectUpdate("Invalid credentials", "error");
return ClusterStatus.AccessDenied;
}
const message = String(error.error || error.message) || String(error);
this.broadcastConnectUpdate(message, true);
this.broadcastConnectUpdate(message, "error");
return ClusterStatus.Offline;
}
if (error.failed === true) {
if (error.timedOut === true) {
this.broadcastConnectUpdate("Connection timed out", true);
this.broadcastConnectUpdate("Connection timed out", "error");
return ClusterStatus.Offline;
}
this.broadcastConnectUpdate("Failed to fetch credentials", true);
this.broadcastConnectUpdate("Failed to fetch credentials", "error");
return ClusterStatus.AccessDenied;
}
const message = String(error.error || error.message) || String(error);
this.broadcastConnectUpdate(message, true);
this.broadcastConnectUpdate(message, "error");
} else if (error instanceof Error || typeof error === "string") {
this.broadcastConnectUpdate(`${error}`, true);
this.broadcastConnectUpdate(`${error}`, "error");
} else {
this.broadcastConnectUpdate("Unknown error has occurred", true);
this.broadcastConnectUpdate("Unknown error has occurred", "error");
}
return ClusterStatus.Offline;
@ -636,8 +647,8 @@ export class Cluster implements ClusterModel {
* broadcast an authentication update concerning this cluster
* @internal
*/
broadcastConnectUpdate(message: string, isError = false): void {
const update: KubeAuthUpdate = { message, isError };
broadcastConnectUpdate(message: string, level: KubeAuthUpdate["level"] = "info"): void {
const update: KubeAuthUpdate = { message, level };
this.dependencies.logger.debug(`[CLUSTER]: broadcasting connection update`, { ...update, meta: this.getMeta() });
this.dependencies.broadcastMessage(`cluster:${this.id}:connection-update`, update);
@ -668,7 +679,7 @@ export class Cluster implements ClusterModel {
}
protected async getAllowedResources(requestNamespaceListPermissions: RequestNamespaceListPermissions) {
if (!this.allowedNamespaces.length) {
if (!this.allowedNamespaces.length || !this.knownResources.length) {
return [];
}

View File

@ -0,0 +1,70 @@
/**
* Copyright (c) OpenLens Authors. All rights reserved.
* Licensed under MIT License. See LICENSE in root directory for more information.
*/
import type { AsyncResult } from "./async-result";
import { delay } from "./delay";
import { noop } from "./noop";
/**
* @param error The error that resulted in the failure
* @param attempt The 1-index attempt count
*/
export type OnIntermediateError<E> = (error: E, attempt: number) => void;
export interface BackoffCallerOptions<E> {
/**
* Called when an attempt fails
*/
onIntermediateError?: OnIntermediateError<E>;
/**
* @default 5
*/
maxAttempts?: number;
/**
* In miliseconds
* @default 1000
*/
initialTimeout?: number;
/**
* @default 2
*/
scaleFactor?: number;
}
/**
* Calls `fn` once and then again (with exponential delay between each attempt) up to `options.maxAttempts` times.
* @param fn The function to repeatedly attempt
* @returns The first success or the last failure
*/
export const backoffCaller = async <T, E, R extends AsyncResult<T, E>>(fn: () => Promise<R>, options?: BackoffCallerOptions<E>): Promise<R> => {
const {
initialTimeout = 1000,
maxAttempts = 5,
onIntermediateError = noop as OnIntermediateError<E>,
scaleFactor = 2,
} = options ?? {};
let timeout = initialTimeout;
let attempt = 0;
let result: R;
do {
result = await fn();
if (result.callWasSuccessful) {
return result;
}
onIntermediateError(result.error, attempt + 1);
await delay(timeout);
timeout *= scaleFactor;
} while (attempt += 1, attempt < maxAttempts);
return result;
};

View File

@ -200,34 +200,34 @@ describe("kube auth proxy tests", () => {
await proxy.run();
listeners.emit("error", { message: "foobarbat" });
expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "foobarbat", isError: true });
expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "foobarbat", level: "error" });
});
it("should call spawn and broadcast exit", async () => {
await proxy.run();
listeners.emit("exit", 0);
expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "proxy exited with code: 0", isError: false });
expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "proxy exited with code: 0", level: "info" });
});
it("should call spawn and broadcast errors from stderr", async () => {
await proxy.run();
listeners.emit("stderr/data", "an error");
expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "an error", isError: true });
expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "an error", level: "error" });
});
it("should call spawn and broadcast stdout serving info", async () => {
await proxy.run();
expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "Authentication proxy started", isError: false });
expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "Authentication proxy started", level: "info" });
});
it("should call spawn and broadcast stdout other info", async () => {
await proxy.run();
listeners.emit("stdout/data", "some info");
expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "some info", isError: false });
expect(broadcastMessageMock).toBeCalledWith("cluster:foobar:connection-update", { message: "some info", level: "info" });
});
});
});

View File

@ -10,8 +10,10 @@ import type { Cluster } from "../../common/cluster/cluster";
import { requestApiVersionsInjectionToken } from "./request-api-versions";
import { withConcurrencyLimit } from "../../common/utils/with-concurrency-limit";
import requestKubeApiResourcesForInjectable from "./request-kube-api-resources-for.injectable";
import type { AsyncResult } from "../../common/utils/async-result";
import { backoffCaller } from "../../common/utils/backoff-caller";
export type RequestApiResources = (cluster: Cluster) => Promise<KubeApiResource[]>;
export type RequestApiResources = (cluster: Cluster) => Promise<AsyncResult<KubeApiResource[], Error>>;
export interface KubeResourceListGroup {
group: string;
@ -25,23 +27,46 @@ const requestApiResourcesInjectable = getInjectable({
const apiVersionRequesters = di.injectMany(requestApiVersionsInjectionToken);
const requestKubeApiResourcesFor = di.inject(requestKubeApiResourcesForInjectable);
return async (cluster) => {
return async (...args) => {
const [cluster] = args;
const requestKubeApiResources = withConcurrencyLimit(5)(requestKubeApiResourcesFor(cluster));
try {
const requests = await Promise.all(apiVersionRequesters.map(fn => fn(cluster)));
const resources = await Promise.all((
requests
.flat()
.map(requestKubeApiResources)
));
const groupLists: KubeResourceListGroup[] = [];
return resources.flat();
} catch (error) {
logger.error(`[LIST-API-RESOURCES]: failed to list api resources: ${error}`);
for (const apiVersionRequester of apiVersionRequesters) {
const result = await backoffCaller(() => apiVersionRequester(cluster), {
onIntermediateError: (error, attempt) => {
cluster.broadcastConnectUpdate(`Failed to list kube API resource kinds, attempt ${attempt}: ${error}`, "warning");
logger.warn(`[LIST-API-RESOURCES]: failed to list kube api resources: ${error}`, { attempt, clusterId: cluster.id });
},
});
return [];
if (!result.callWasSuccessful) {
return result;
}
groupLists.push(...result.response);
}
const apiResourceRequests = groupLists.map(async listGroup => (
Object.assign(await requestKubeApiResources(listGroup), { listGroup })
));
const results = await Promise.all(apiResourceRequests);
const resources: KubeApiResource[] = [];
for (const result of results) {
if (!result.callWasSuccessful) {
cluster.broadcastConnectUpdate(`Kube APIs under "${result.listGroup.path}" may not be displayed`, "warning");
continue;
}
resources.push(...result.response);
}
return {
callWasSuccessful: true,
response: resources,
};
};
},
});

View File

@ -5,13 +5,14 @@
import { getInjectionToken } from "@ogre-tools/injectable";
import type { Cluster } from "../../common/cluster/cluster";
import type { AsyncResult } from "../../common/utils/async-result";
export interface KubeResourceListGroup {
group: string;
path: string;
}
export type RequestApiVersions = (cluster: Cluster) => Promise<KubeResourceListGroup[]>;
export type RequestApiVersions = (cluster: Cluster) => Promise<AsyncResult<KubeResourceListGroup[], Error>>;
export const requestApiVersionsInjectionToken = getInjectionToken<RequestApiVersions>({
id: "request-api-versions-token",

View File

@ -13,12 +13,22 @@ const requestCoreApiVersionsInjectable = getInjectable({
const k8sRequest = di.inject(k8sRequestInjectable);
return async (cluster) => {
const { versions } = await k8sRequest(cluster, "/api") as V1APIVersions;
try {
const { versions } = await k8sRequest(cluster, "/api") as V1APIVersions;
return versions.map(version => ({
group: "",
path: `/api/${version}`,
}));
return {
callWasSuccessful: true,
response: versions.map(version => ({
group: "",
path: `/api/${version}`,
})),
};
} catch (error) {
return {
callWasSuccessful: false,
error: error as Error,
};
}
};
},
injectionToken: requestApiVersionsInjectionToken,

View File

@ -6,10 +6,11 @@ import type { V1APIResourceList } from "@kubernetes/client-node";
import { getInjectable } from "@ogre-tools/injectable";
import type { Cluster } from "../../common/cluster/cluster";
import type { KubeApiResource } from "../../common/rbac";
import type { AsyncResult } from "../../common/utils/async-result";
import k8sRequestInjectable from "../k8s-request.injectable";
import type { KubeResourceListGroup } from "./request-api-versions";
export type RequestKubeApiResources = (grouping: KubeResourceListGroup) => Promise<KubeApiResource[]>;
export type RequestKubeApiResources = (grouping: KubeResourceListGroup) => Promise<AsyncResult<KubeApiResource[], Error>>;
export type RequestKubeApiResourcesFor = (cluster: Cluster) => RequestKubeApiResources;
@ -19,14 +20,24 @@ const requestKubeApiResourcesForInjectable = getInjectable({
const k8sRequest = di.inject(k8sRequestInjectable);
return (cluster) => async ({ group, path }) => {
const { resources } = await k8sRequest(cluster, path) as V1APIResourceList;
try {
const { resources } = await k8sRequest(cluster, path) as V1APIResourceList;
return resources.map(resource => ({
apiName: resource.name,
kind: resource.kind,
group,
namespaced: resource.namespaced,
}));
return {
callWasSuccessful: true,
response: resources.map(resource => ({
apiName: resource.name,
kind: resource.kind,
group,
namespaced: resource.namespaced,
})),
};
} catch (error) {
return {
callWasSuccessful: false,
error: error as Error,
};
}
};
},
});

View File

@ -14,14 +14,24 @@ const requestNonCoreApiVersionsInjectable = getInjectable({
const k8sRequest = di.inject(k8sRequestInjectable);
return async (cluster) => {
const { groups } = await k8sRequest(cluster, "/apis") as V1APIGroupList;
try {
const { groups } = await k8sRequest(cluster, "/apis") as V1APIGroupList;
return chain(groups.values())
.filterMap(group => group.preferredVersion?.groupVersion && ({
group: group.name,
path: `/apis/${group.preferredVersion.groupVersion}`,
}))
.collect(v => [...v]);
return {
callWasSuccessful: true,
response: chain(groups.values())
.filterMap(group => group.preferredVersion?.groupVersion && ({
group: group.name,
path: `/apis/${group.preferredVersion.groupVersion}`,
}))
.collect(v => [...v]),
};
} catch (error) {
return {
callWasSuccessful: false,
error: error as Error,
};
}
};
},
injectionToken: requestApiVersionsInjectionToken,

View File

@ -71,17 +71,17 @@ export class KubeAuthProxy {
},
});
this.proxyProcess.on("error", (error) => {
this.cluster.broadcastConnectUpdate(error.message, true);
this.cluster.broadcastConnectUpdate(error.message, "error");
this.exit();
});
this.proxyProcess.on("exit", (code) => {
this.cluster.broadcastConnectUpdate(`proxy exited with code: ${code}`, code ? code > 0: false);
this.cluster.broadcastConnectUpdate(`proxy exited with code: ${code}`, code ? "error" : "info");
this.exit();
});
this.proxyProcess.on("disconnect", () => {
this.cluster.broadcastConnectUpdate("Proxy disconnected communications", true );
this.cluster.broadcastConnectUpdate("Proxy disconnected communications", "error");
this.exit();
});
@ -93,7 +93,7 @@ export class KubeAuthProxy {
return;
}
this.cluster.broadcastConnectUpdate(data.toString(), true);
this.cluster.broadcastConnectUpdate(data.toString(), "error");
});
this.proxyProcess.stdout.on("data", (data: Buffer) => {
@ -114,7 +114,7 @@ export class KubeAuthProxy {
this.ready = true;
} catch (error) {
this.dependencies.logger.warn("[KUBE-AUTH-PROXY]: waitUntilUsed failed", error);
this.cluster.broadcastConnectUpdate("Proxy port failed to be used within timelimit, restarting...", true);
this.cluster.broadcastConnectUpdate("Proxy port failed to be used within timelimit, restarting...", "error");
this.exit();
return this.run();

View File

@ -11,7 +11,7 @@ import React from "react";
import { ipcRendererOn } from "../../../common/ipc";
import type { Cluster } from "../../../common/cluster/cluster";
import type { IClassName } from "../../utils";
import { isBoolean, hasTypedProperty, isObject, isString, cssNames } from "../../utils";
import { hasTypedProperty, isObject, isString, cssNames } from "../../utils";
import { Button } from "../button";
import { Icon } from "../icon";
import { Spinner } from "../spinner";
@ -51,8 +51,8 @@ class NonInjectedClusterStatus extends React.Component<ClusterStatusProps & Depe
return this.props.entityRegistry.getById(this.cluster.id);
}
@computed get hasErrors(): boolean {
return this.authOutput.some(({ isError }) => isError);
@computed get hasErrorsOrWarnings(): boolean {
return this.authOutput.some(({ level }) => level !== "info");
}
componentDidMount() {
@ -61,7 +61,7 @@ class NonInjectedClusterStatus extends React.Component<ClusterStatusProps & Depe
if (
isObject(res)
&& hasTypedProperty(res, "message", isString)
&& hasTypedProperty(res, "isError", isBoolean)
&& hasTypedProperty(res, "level", function (val): val is KubeAuthUpdate["level"] { return ["info", "warning", "error"].includes(val as string); })
) {
this.authOutput.push(res);
} else {
@ -87,7 +87,7 @@ class NonInjectedClusterStatus extends React.Component<ClusterStatusProps & Depe
} catch (error) {
this.authOutput.push({
message: String(error),
isError: true,
level: "error",
});
} finally {
this.isReconnecting = false;
@ -102,8 +102,8 @@ class NonInjectedClusterStatus extends React.Component<ClusterStatusProps & Depe
return (
<pre>
{
this.authOutput.map(({ message, isError }, index) => (
<p key={index} className={cssNames({ error: isError })}>
this.authOutput.map(({ message, level }, index) => (
<p key={index} className={cssNames({ error: level === "error", warning: level === "warning" })}>
{message.trim()}
</p>
))
@ -113,7 +113,7 @@ class NonInjectedClusterStatus extends React.Component<ClusterStatusProps & Depe
}
renderStatusIcon() {
if (this.hasErrors) {
if (this.hasErrorsOrWarnings) {
return <Icon material="cloud_off" className={styles.icon} />;
}
@ -131,7 +131,7 @@ class NonInjectedClusterStatus extends React.Component<ClusterStatusProps & Depe
}
renderReconnectionHelp() {
if (this.hasErrors && !this.isReconnecting) {
if (this.hasErrorsOrWarnings && !this.isReconnecting) {
return (
<>
<Button