1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

More work to get tests to pass in a more realistic way

Signed-off-by: Sebastian Malton <sebastian@malton.name>
This commit is contained in:
Sebastian Malton 2023-01-09 15:27:34 -05:00
parent e88743d65f
commit 488546123a
20 changed files with 383 additions and 239 deletions

View File

@ -3,7 +3,15 @@
* Licensed under MIT License. See LICENSE in root directory for more information.
*/
import { action, comparer, computed, makeObservable, observable, reaction, when } from "mobx";
import {
action,
comparer,
computed,
makeObservable,
observable,
reaction,
when,
} from "mobx";
import type { ClusterContextHandler } from "../../main/context-handler/context-handler";
import type { KubeConfig } from "@kubernetes/client-node";
import { HttpError } from "@kubernetes/client-node";
@ -14,8 +22,25 @@ import { formatKubeApiResource } from "../rbac";
import type { VersionDetector } from "../../main/cluster-detectors/version-detector";
import type { DetectorRegistry } from "../../main/cluster-detectors/detector-registry";
import plimit from "p-limit";
import type { ClusterState, ClusterMetricsResourceType, ClusterId, ClusterMetadata, ClusterModel, ClusterPreferences, ClusterPrometheusPreferences, UpdateClusterModel, KubeAuthUpdate, ClusterConfigData } from "../cluster-types";
import { ClusterMetadataKey, initialNodeShellImage, ClusterStatus, clusterModelIdChecker, updateClusterModelChecker } from "../cluster-types";
import type {
ClusterState,
ClusterMetricsResourceType,
ClusterId,
ClusterMetadata,
ClusterModel,
ClusterPreferences,
ClusterPrometheusPreferences,
UpdateClusterModel,
KubeAuthUpdate,
ClusterConfigData,
} from "../cluster-types";
import {
ClusterMetadataKey,
initialNodeShellImage,
ClusterStatus,
clusterModelIdChecker,
updateClusterModelChecker,
} from "../cluster-types";
import { disposer, isDefined, isRequestError, toJS } from "../utils";
import { clusterListNamespaceForbiddenChannel } from "../ipc/cluster";
import type { CanI } from "./authorization-review.injectable";
@ -24,8 +49,12 @@ import assert from "assert";
import type { Logger } from "../logger";
import type { BroadcastMessage } from "../ipc/broadcast-message.injectable";
import type { LoadConfigfromFile } from "../kube-helpers/load-config-from-file.injectable";
import type { RequestNamespaceListPermissions, RequestNamespaceListPermissionsFor } from "./request-namespace-list-permissions.injectable";
import type {
RequestNamespaceListPermissions,
RequestNamespaceListPermissionsFor,
} from "./request-namespace-list-permissions.injectable";
import type { RequestApiResources } from "../../main/cluster/request-api-resources.injectable";
import type { SendClusterConnectUpdate } from "./send-cluster-connect-update.injectable";
export interface ClusterDependencies {
readonly directoryForKubeConfigs: string;
@ -41,6 +70,7 @@ export interface ClusterDependencies {
createVersionDetector: (cluster: Cluster) => VersionDetector;
broadcastMessage: BroadcastMessage;
loadConfigfromFile: LoadConfigfromFile;
sendClusterConnectUpdate: SendClusterConnectUpdate;
}
/**
@ -236,7 +266,11 @@ export class Cluster implements ClusterModel {
return this.preferences.defaultNamespace;
}
constructor(private readonly dependencies: ClusterDependencies, { id, ...model }: ClusterModel, configData: ClusterConfigData) {
constructor(
private readonly dependencies: ClusterDependencies,
{ id, ...model }: ClusterModel,
configData: ClusterConfigData,
) {
makeObservable(this);
const { error } = clusterModelIdChecker.validate({ id });
@ -306,18 +340,27 @@ export class Cluster implements ClusterModel {
*/
protected bindEvents() {
this.dependencies.logger.info(`[CLUSTER]: bind events`, this.getMeta());
const refreshTimer = setInterval(() => !this.disconnected && this.refresh(), 30000); // every 30s
const refreshMetadataTimer = setInterval(() => this.available && this.refreshAccessibilityAndMetadata(), 900000); // every 15 minutes
const refreshTimer = setInterval(
() => !this.disconnected && this.refresh(),
30_000,
);
const refreshMetadataTimer = setInterval(
() => this.available && this.refreshAccessibilityAndMetadata(),
15 * 60_1000,
);
this.eventsDisposer.push(
reaction(
() => this.prometheusPreferences,
prefs => this.contextHandler.setupPrometheus(prefs),
(prefs) => this.contextHandler.setupPrometheus(prefs),
{ equals: comparer.structural },
),
() => clearInterval(refreshTimer),
() => clearInterval(refreshMetadataTimer),
reaction(() => this.defaultNamespace, () => this.recreateProxyKubeconfig()),
reaction(
() => this.defaultNamespace,
() => this.recreateProxyKubeconfig(),
),
);
}
@ -351,11 +394,14 @@ export class Cluster implements ClusterModel {
this.bindEvents();
}
console.log("before reconnect");
if (this.disconnected || !this.accessible) {
try {
this.broadcastConnectUpdate("Starting connection ...");
await this.reconnect();
} catch (error) {
console.log(error);
this.broadcastConnectUpdate(`Failed to start connection: ${error}`, true);
return;
@ -382,8 +428,13 @@ export class Cluster implements ClusterModel {
}
// download kubectl in background, so it's not blocking dashboard
this.ensureKubectl()
.catch(error => this.dependencies.logger.warn(`[CLUSTER]: failed to download kubectl for clusterId=${this.id}`, error));
(async () => {
try {
await this.ensureKubectl();
} catch (error) {
this.dependencies.logger.warn(`[CLUSTER]: failed to download kubectl for clusterId=${this.id}`, error);
}
})();
this.broadcastConnectUpdate("Connected, waiting for view to load ...");
}
@ -416,7 +467,7 @@ export class Cluster implements ClusterModel {
*/
@action disconnect(): void {
if (this.disconnected) {
return void this.dependencies.logger.debug("[CLUSTER]: already disconnected", { id: this.id });
return this.dependencies.logger.debug("[CLUSTER]: already disconnected", { id: this.id });
}
this.dependencies.logger.info(`[CLUSTER]: disconnecting`, { id: this.id });
@ -443,57 +494,58 @@ export class Cluster implements ClusterModel {
/**
* @internal
*/
@action
@action
async refreshAccessibilityAndMetadata() {
await this.refreshAccessibility();
await this.refreshMetadata();
}
/**
/**
* @internal
*/
async refreshMetadata() {
this.dependencies.logger.info(`[CLUSTER]: refreshMetadata`, this.getMeta());
const metadata = await this.dependencies.detectorRegistry.detectForCluster(this);
const existingMetadata = this.metadata;
async refreshMetadata() {
this.dependencies.logger.info(`[CLUSTER]: refreshMetadata`, this.getMeta());
const metadata = await this.dependencies.detectorRegistry.detectForCluster(this);
const existingMetadata = this.metadata;
this.metadata = Object.assign(existingMetadata, metadata);
}
this.metadata = Object.assign(existingMetadata, metadata);
}
/**
/**
* @internal
*/
private async refreshAccessibility(): Promise<void> {
this.dependencies.logger.info(`[CLUSTER]: refreshAccessibility`, this.getMeta());
const proxyConfig = await this.getProxyKubeconfig();
const canI = this.dependencies.createAuthorizationReview(proxyConfig);
const requestNamespaceListPermissions = this.dependencies.requestNamespaceListPermissionsFor(proxyConfig);
private async refreshAccessibility(): Promise<void> {
console.log("in refreshAccessibility");
this.dependencies.logger.info(`[CLUSTER]: refreshAccessibility`, this.getMeta());
const proxyConfig = await this.getProxyKubeconfig();
const canI = this.dependencies.createAuthorizationReview(proxyConfig);
const requestNamespaceListPermissions = this.dependencies.requestNamespaceListPermissionsFor(proxyConfig);
this.isAdmin = await canI({
namespace: "kube-system",
resource: "*",
verb: "create",
});
this.isGlobalWatchEnabled = await canI({
verb: "watch",
resource: "*",
});
this.allowedNamespaces.replace(await this.requestAllowedNamespaces(proxyConfig));
this.knownResources.replace(await this.dependencies.requestApiResources(this));
this.allowedResources.replace(await this.getAllowedResources(requestNamespaceListPermissions));
this.ready = true;
}
this.isAdmin = await canI({
namespace: "kube-system",
resource: "*",
verb: "create",
});
this.isGlobalWatchEnabled = await canI({
verb: "watch",
resource: "*",
});
this.allowedNamespaces.replace(await this.requestAllowedNamespaces(proxyConfig));
this.knownResources.replace(await this.dependencies.requestApiResources(this));
this.allowedResources.replace(await this.getAllowedResources(requestNamespaceListPermissions));
this.ready = true;
}
/**
* @internal
*/
@action
async refreshConnectionStatus() {
const connectionStatus = await this.getConnectionStatus();
async refreshConnectionStatus() {
const connectionStatus = await this.getConnectionStatus();
this.online = connectionStatus > ClusterStatus.Offline;
this.accessible = connectionStatus == ClusterStatus.AccessGranted;
}
this.online = connectionStatus > ClusterStatus.Offline;
this.accessible = connectionStatus == ClusterStatus.AccessGranted;
}
async getKubeconfig(): Promise<KubeConfig> {
const { config } = await this.dependencies.loadConfigfromFile(this.kubeConfigPath);
@ -527,6 +579,7 @@ export class Cluster implements ClusterModel {
return ClusterStatus.AccessGranted;
} catch (error) {
console.log(error);
this.dependencies.logger.error(`[CLUSTER]: Failed to connect to "${this.contextName}": ${error}`);
if (isRequestError(error)) {
@ -634,7 +687,7 @@ export class Cluster implements ClusterModel {
const update: KubeAuthUpdate = { message, isError };
this.dependencies.logger.debug(`[CLUSTER]: broadcasting connection update`, { ...update, meta: this.getMeta() });
this.dependencies.broadcastMessage(`cluster:${this.id}:connection-update`, update);
this.dependencies.sendClusterConnectUpdate(update);
}
protected async requestAllowedNamespaces(proxyConfig: KubeConfig) {
@ -650,11 +703,21 @@ export class Cluster implements ClusterModel {
const ctx = proxyConfig.getContextObject(this.contextName);
const namespaceList = [ctx?.namespace].filter(isDefined);
if (namespaceList.length === 0 && error instanceof HttpError && error.statusCode === 403) {
if (
namespaceList.length === 0 &&
error instanceof HttpError &&
error.statusCode === 403
) {
const { response } = error as HttpError & { response: { body: unknown }};
this.dependencies.logger.info("[CLUSTER]: listing namespaces is forbidden, broadcasting", { clusterId: this.id, error: response.body });
this.dependencies.broadcastMessage(clusterListNamespaceForbiddenChannel, this.id);
this.dependencies.logger.info("[CLUSTER]: listing namespaces is forbidden, broadcasting", {
clusterId: this.id,
error: response.body,
});
this.dependencies.broadcastMessage(
clusterListNamespaceForbiddenChannel,
this.id,
);
}
return namespaceList;
@ -668,12 +731,14 @@ export class Cluster implements ClusterModel {
try {
const apiLimit = plimit(5); // 5 concurrent api requests
const canListResourceCheckers = await Promise.all((
this.allowedNamespaces.map(namespace => apiLimit(() => requestNamespaceListPermissions(namespace)))
));
const canListResourceCheckers = await Promise.all(
this.allowedNamespaces.map((namespace) =>
apiLimit(() => requestNamespaceListPermissions(namespace)),
),
);
return this.knownResources
.filter((resource) => canListResourceCheckers.some(fn => fn(resource)))
.filter((resource) => canListResourceCheckers.some((fn) => fn(resource)))
.map(formatKubeApiResource);
} catch (error) {
return [];
@ -681,6 +746,11 @@ export class Cluster implements ClusterModel {
}
shouldShowResource(resource: KubeApiResourceDescriptor): boolean {
// console.log({
// resource,
// allowed: toJS([...this.allowedResources]),
// });
return this.allowedResources.has(formatKubeApiResource(resource));
}

View File

@ -0,0 +1,9 @@
/**
* Copyright (c) OpenLens Authors. All rights reserved.
* Licensed under MIT License. See LICENSE in root directory for more information.
*/
import { getGlobalOverride } from "../test-utils/get-global-override";
import sendClusterConnectUpdateInjectable from "./send-cluster-connect-update.injectable";
export default getGlobalOverride(sendClusterConnectUpdateInjectable, () => () => {});

View File

@ -0,0 +1,23 @@
/**
* Copyright (c) OpenLens Authors. All rights reserved.
* Licensed under MIT License. See LICENSE in root directory for more information.
*/
import { getInjectable, lifecycleEnum } from "@ogre-tools/injectable";
import type { KubeAuthUpdate } from "../cluster-types";
import broadcastMessageInjectable from "../ipc/broadcast-message.injectable";
export type SendClusterConnectUpdate = (update: KubeAuthUpdate) => void;
const sendClusterConnectUpdateInjectable = getInjectable({
id: "send-cluster-connect-update",
instantiate: (di, clusterId): SendClusterConnectUpdate => {
const broadcastMessage = di.inject(broadcastMessageInjectable);
return (update) => broadcastMessage(`cluster:${clusterId}:connection-update`, update);
},
lifecycle: lifecycleEnum.keyedSingleton({
getInstanceKey: (di, clusterId: string) => clusterId,
}),
});
export default sendClusterConnectUpdateInjectable;

View File

@ -3,6 +3,7 @@
* Licensed under MIT License. See LICENSE in root directory for more information.
*/
import type { IObservableValue } from "mobx";
import { runInAction } from "mobx";
import { inspect } from "util";
import { isDefined } from "./type-narrowing";
@ -23,6 +24,21 @@ export function getOrInsert<K, V>(map: Map<K, V>, key: K, value: V): V {
return map.get(key)!;
}
export async function getOrInsertWithObservable<K>(box: IObservableValue<K | undefined>, getter: () => Promise<K>): Promise<K> {
const val = box.get();
if (val) {
return val;
}
const newVal = await getter();
runInAction(() => box.set(newVal));
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return box.get()!;
}
/**
* Updates map and returns the value that was just inserted
*/

View File

@ -2,7 +2,7 @@
* Copyright (c) OpenLens Authors. All rights reserved.
* Licensed under MIT License. See LICENSE in root directory for more information.
*/
import { getGlobalOverride } from "../../../common/test-utils/get-global-override";
import { getGlobalOverride } from "../../test-utils/get-global-override";
import waitUntilPortIsUsedInjectable from "./wait-until-port-is-used.injectable";
export default getGlobalOverride(

View File

@ -64,6 +64,7 @@ describe("cluster/namespaces - edit namespace from new tab", () => {
windowDi.override(callForPatchResourceInjectable, () => callForPatchResourceMock);
});
builder.namespaces.add("default");
builder.allowKubeResource({
apiName: "namespaces",
group: "v1",

View File

@ -369,7 +369,6 @@ export const setupInitializingApplicationBuilder = (init: (builder: ApplicationB
));
mainDi.override(requestNamespaceListPermissionsForInjectable, () => () => async () => (resource) => allowedResourcesState.has(formatKubeApiResource(resource)));
runInAction(() => {
mainDi.register(getInjectable({
id: "create-fake-cluster",
@ -584,7 +583,7 @@ export const setupInitializingApplicationBuilder = (init: (builder: ApplicationB
setEnvironmentToClusterFrame: () => {
environment = environments.clusterFrame;
builder.beforeWindowStart((windowDi) => {
builder.beforeWindowStart(async (windowDi) => {
windowDi.override(hostedClusterIdInjectable, () => clusterId);
// TODO: Figure out a way to remove this stub.
@ -606,6 +605,8 @@ export const setupInitializingApplicationBuilder = (init: (builder: ApplicationB
isSelectedAll: () => false,
getTotalCount: () => namespaceItems.length,
} as Partial<NamespaceStore> as NamespaceStore));
await clusters.get(clusterId)?.activate();
});
builder.afterWindowStart(windowDi => {

View File

@ -3,7 +3,7 @@
* Licensed under MIT License. See LICENSE in root directory for more information.
*/
import waitUntilPortIsUsedInjectable from "../kube-auth-proxy/wait-until-port-is-used/wait-until-port-is-used.injectable";
import waitUntilPortIsUsedInjectable from "../../common/utils/wait-until-port-is-used/wait-until-port-is-used.injectable";
import type { Cluster } from "../../common/cluster/cluster";
import type { KubeAuthProxy } from "../kube-auth-proxy/kube-auth-proxy";
import type { ChildProcess } from "child_process";

View File

@ -9,7 +9,6 @@ export type Spawn = typeof spawn;
const spawnInjectable = getInjectable({
id: "spawn",
instantiate: (): Spawn => spawn,
causesSideEffects: true,
});

View File

@ -0,0 +1,12 @@
/**
* Copyright (c) OpenLens Authors. All rights reserved.
* Licensed under MIT License. See LICENSE in root directory for more information.
*/
import type { Cluster } from "../../common/cluster/cluster";
export const getProxyEnv = (cluster: Cluster) => cluster.preferences.httpsProxy
? {
HTTPS_PROXY: cluster.preferences.httpsProxy,
}
: {};

View File

@ -10,11 +10,12 @@ import type httpProxy from "http-proxy";
import type { UrlWithStringQuery } from "url";
import url from "url";
import { CoreV1Api } from "@kubernetes/client-node";
import type { KubeAuthProxy } from "../kube-auth-proxy/kube-auth-proxy";
import type { CreateKubeAuthProxy } from "../kube-auth-proxy/create-kube-auth-proxy.injectable";
import type { KubeAuthProxy, CreateKubeAuthProxy } from "../kube-auth-proxy/create-kube-auth-proxy.injectable";
import type { GetPrometheusProviderByKind } from "../prometheus/get-by-kind.injectable";
import type { IComputedValue } from "mobx";
import type { Logger } from "../../common/logger";
import type { KubeAuthProxyProcess } from "../kube-auth-proxy/spawn-proxy.injectable";
import { waitUntilDefined } from "../../common/utils";
export interface PrometheusDetails {
prometheusPath: string;
@ -181,22 +182,14 @@ export class ContextHandler implements ClusterContextHandler {
};
}
protected async ensureServerHelper(): Promise<KubeAuthProxy> {
protected async ensureServerHelper(): Promise<KubeAuthProxyProcess> {
if (!this.kubeAuthProxy) {
const proxyEnv = Object.assign({}, process.env);
this.kubeAuthProxy = this.dependencies.createKubeAuthProxy(this.cluster);
if (this.cluster.preferences.httpsProxy) {
proxyEnv.HTTPS_PROXY = this.cluster.preferences.httpsProxy;
}
this.kubeAuthProxy = this.dependencies.createKubeAuthProxy(this.cluster, proxyEnv);
await this.kubeAuthProxy.run();
return this.kubeAuthProxy;
return this.kubeAuthProxy.run();
} else {
return waitUntilDefined(this.kubeAuthProxy.proxyProcess);
}
await this.kubeAuthProxy.whenReady;
return this.kubeAuthProxy;
}
async ensureServer(): Promise<void> {

View File

@ -5,17 +5,12 @@
import { getInjectable, lifecycleEnum } from "@ogre-tools/injectable";
import { computed } from "mobx";
import { shouldShowResourceInjectionToken } from "../../common/cluster-store/allowed-resources-injection-token";
import type { KubeApiResourceDescriptor } from "../../common/rbac";
import { formatKubeApiResource } from "../../common/rbac";
// TODO: Figure out implementation for this later.
const allowedResourcesInjectable = getInjectable({
id: "allowed-resources",
instantiate: () => computed(() => false),
injectionToken: shouldShowResourceInjectionToken,
lifecycle: lifecycleEnum.keyedSingleton({
getInstanceKey: (di, resource: KubeApiResourceDescriptor) => formatKubeApiResource(resource),
}),
lifecycle: lifecycleEnum.singleton,
});
export default allowedResourcesInjectable;

View File

@ -19,12 +19,13 @@ import createVersionDetectorInjectable from "../cluster-detectors/create-version
import broadcastMessageInjectable from "../../common/ipc/broadcast-message.injectable";
import loadConfigfromFileInjectable from "../../common/kube-helpers/load-config-from-file.injectable";
import requestNamespaceListPermissionsForInjectable from "../../common/cluster/request-namespace-list-permissions.injectable";
import sendClusterConnectUpdateInjectable from "../../common/cluster/send-cluster-connect-update.injectable";
const createClusterInjectable = getInjectable({
id: "create-cluster",
instantiate: (di) => {
const dependencies: ClusterDependencies = {
const dependencies: Omit<ClusterDependencies, "sendClusterConnectUpdate"> = {
directoryForKubeConfigs: di.inject(directoryForKubeConfigsInjectable),
createKubeconfigManager: di.inject(createKubeconfigManagerInjectable),
createKubectl: di.inject(createKubectlInjectable),
@ -40,7 +41,10 @@ const createClusterInjectable = getInjectable({
loadConfigfromFile: di.inject(loadConfigfromFileInjectable),
};
return (model, configData) => new Cluster(dependencies, model, configData);
return (model, configData) => new Cluster({
...dependencies,
sendClusterConnectUpdate: di.inject(sendClusterConnectUpdateInjectable, model.id),
}, model, configData);
},
injectionToken: createClusterInjectionToken,

View File

@ -89,6 +89,7 @@ export function getDiForUnitTesting(opts: { doGeneralOverrides?: boolean } = {})
stderr: { on: jest.fn(), removeAllListeners: jest.fn() },
stdout: { on: jest.fn(), removeAllListeners: jest.fn() },
on: jest.fn(),
off: jest.fn(),
} as never;
});
}

View File

@ -3,37 +3,46 @@
* Licensed under MIT License. See LICENSE in root directory for more information.
*/
import { getInjectable } from "@ogre-tools/injectable";
import type { KubeAuthProxyDependencies } from "./kube-auth-proxy";
import { KubeAuthProxy } from "./kube-auth-proxy";
import type { Cluster } from "../../common/cluster/cluster";
import spawnInjectable from "../child-process/spawn.injectable";
import kubeAuthProxyCertificateInjectable from "./kube-auth-proxy-certificate.injectable";
import loggerInjectable from "../../common/logger.injectable";
import waitUntilPortIsUsedInjectable from "./wait-until-port-is-used/wait-until-port-is-used.injectable";
import lensK8sProxyPathInjectable from "./lens-k8s-proxy-path.injectable";
import getPortFromStreamInjectable from "../utils/get-port-from-stream.injectable";
import type { KubeAuthProxyProcess } from "./spawn-proxy.injectable";
import spawnKubeAuthProxyInjectable from "./spawn-proxy.injectable";
import type { IObservableValue } from "mobx";
import { observable } from "mobx";
import { getOrInsertWithObservable } from "../../common/utils";
export type CreateKubeAuthProxy = (cluster: Cluster, environmentVariables: NodeJS.ProcessEnv) => KubeAuthProxy;
export interface KubeAuthProxy {
readonly proxyProcess: IObservableValue<KubeAuthProxyProcess | undefined>;
run(): Promise<KubeAuthProxyProcess>;
exit(): void;
}
export type CreateKubeAuthProxy = (cluster: Cluster) => KubeAuthProxy;
const createKubeAuthProxyInjectable = getInjectable({
id: "create-kube-auth-proxy",
instantiate: (di): CreateKubeAuthProxy => {
const dependencies: Omit<KubeAuthProxyDependencies, "proxyCert"> = {
proxyBinPath: di.inject(lensK8sProxyPathInjectable),
spawn: di.inject(spawnInjectable),
logger: di.inject(loggerInjectable),
waitUntilPortIsUsed: di.inject(waitUntilPortIsUsedInjectable),
getPortFromStream: di.inject(getPortFromStreamInjectable),
};
const spawnKubeAuthProxy = di.inject(spawnKubeAuthProxyInjectable);
return (cluster: Cluster, environmentVariables: NodeJS.ProcessEnv) => {
const clusterUrl = new URL(cluster.apiUrl);
return (cluster) => {
const proxyProcess = observable.box<KubeAuthProxyProcess>();
let controller = new AbortController();
return new KubeAuthProxy({
...dependencies,
proxyCert: di.inject(kubeAuthProxyCertificateInjectable, clusterUrl.hostname),
}, cluster, environmentVariables);
return {
proxyProcess,
run: () => getOrInsertWithObservable(proxyProcess, async () => {
controller = new AbortController();
return spawnKubeAuthProxy(cluster, {
signal: controller.signal,
});
}),
exit: () => {
controller.abort();
proxyProcess.get()?.stop();
proxyProcess.set(undefined);
},
};
};
},
});

View File

@ -1,136 +0,0 @@
/**
* Copyright (c) OpenLens Authors. All rights reserved.
* Licensed under MIT License. See LICENSE in root directory for more information.
*/
import type { ChildProcess } from "child_process";
import { randomBytes } from "crypto";
import type { Cluster } from "../../common/cluster/cluster";
import type { GetPortFromStream } from "../utils/get-port-from-stream.injectable";
import { makeObservable, observable, when } from "mobx";
import type { SelfSignedCert } from "selfsigned";
import assert from "assert";
import { TypedRegEx } from "typed-regex";
import type { Spawn } from "../child-process/spawn.injectable";
import type { Logger } from "../../common/logger";
import type { WaitUntilPortIsUsed } from "./wait-until-port-is-used/wait-until-port-is-used.injectable";
const startingServeMatcher = "starting to serve on (?<address>.+)";
const startingServeRegex = Object.assign(TypedRegEx(startingServeMatcher, "i"), {
rawMatcher: startingServeMatcher,
});
export interface KubeAuthProxyDependencies {
readonly proxyBinPath: string;
readonly proxyCert: SelfSignedCert;
readonly logger: Logger;
spawn: Spawn;
waitUntilPortIsUsed: WaitUntilPortIsUsed;
getPortFromStream: GetPortFromStream;
}
export class KubeAuthProxy {
public readonly apiPrefix = `/${randomBytes(8).toString("hex")}`;
public get port(): number {
const port = this._port;
assert(port, "port has not yet been initialized");
return port;
}
protected _port?: number;
protected proxyProcess?: ChildProcess;
@observable protected ready = false;
constructor(private readonly dependencies: KubeAuthProxyDependencies, protected readonly cluster: Cluster, protected readonly env: NodeJS.ProcessEnv) {
makeObservable(this);
}
get whenReady() {
return when(() => this.ready);
}
public async run(): Promise<void> {
if (this.proxyProcess) {
return this.whenReady;
}
const proxyBin = this.dependencies.proxyBinPath;
const cert = this.dependencies.proxyCert;
this.proxyProcess = this.dependencies.spawn(proxyBin, [], {
env: {
...this.env,
KUBECONFIG: this.cluster.kubeConfigPath,
KUBECONFIG_CONTEXT: this.cluster.contextName,
API_PREFIX: this.apiPrefix,
PROXY_KEY: cert.private,
PROXY_CERT: cert.cert,
},
});
this.proxyProcess.on("error", (error) => {
this.cluster.broadcastConnectUpdate(error.message, true);
this.exit();
});
this.proxyProcess.on("exit", (code) => {
this.cluster.broadcastConnectUpdate(`proxy exited with code: ${code}`, code ? code > 0: false);
this.exit();
});
this.proxyProcess.on("disconnect", () => {
this.cluster.broadcastConnectUpdate("Proxy disconnected communications", true );
this.exit();
});
assert(this.proxyProcess.stderr);
assert(this.proxyProcess.stdout);
this.proxyProcess.stderr.on("data", (data: Buffer) => {
if (data.includes("http: TLS handshake error")) {
return;
}
this.cluster.broadcastConnectUpdate(data.toString(), true);
});
this.proxyProcess.stdout.on("data", (data: Buffer) => {
if (typeof this._port === "number") {
this.cluster.broadcastConnectUpdate(data.toString());
}
});
this._port = await this.dependencies.getPortFromStream(this.proxyProcess.stdout, {
lineRegex: startingServeRegex,
onFind: () => this.cluster.broadcastConnectUpdate("Authentication proxy started"),
});
this.dependencies.logger.info(`[KUBE-AUTH-PROXY]: found port=${this._port}`);
try {
await this.dependencies.waitUntilPortIsUsed(this.port, 500, 10000);
this.ready = true;
} catch (error) {
this.dependencies.logger.warn("[KUBE-AUTH-PROXY]: waitUntilUsed failed", error);
this.cluster.broadcastConnectUpdate("Proxy port failed to be used within timelimit, restarting...", true);
this.exit();
return this.run();
}
}
public exit() {
this.ready = false;
if (this.proxyProcess) {
this.dependencies.logger.debug("[KUBE-AUTH]: stopping local proxy", this.cluster.getMeta());
this.proxyProcess.removeAllListeners();
this.proxyProcess.stderr?.removeAllListeners();
this.proxyProcess.stdout?.removeAllListeners();
this.proxyProcess.kill();
this.proxyProcess = undefined;
}
}
}

View File

@ -0,0 +1,13 @@
/**
* Copyright (c) OpenLens Authors. All rights reserved.
* Licensed under MIT License. See LICENSE in root directory for more information.
*/
import { getGlobalOverride } from "../../common/test-utils/get-global-override";
import spawnKubeAuthProxyInjectable from "./spawn-proxy.injectable";
export default getGlobalOverride(spawnKubeAuthProxyInjectable, () => async (cluster) => ({
apiPrefix: `/some-api-prefix-for-${cluster.id}`,
port: 4233,
stop: () => {},
}));

View File

@ -0,0 +1,133 @@
/**
* Copyright (c) OpenLens Authors. All rights reserved.
* Licensed under MIT License. See LICENSE in root directory for more information.
*/
import { getInjectable } from "@ogre-tools/injectable";
import { randomBytes } from "crypto";
import process from "process";
import { TypedRegEx } from "typed-regex";
import type { Cluster } from "../../common/cluster/cluster";
import loggerInjectable from "../../common/logger.injectable";
import spawnInjectable from "../child-process/spawn.injectable";
import { getProxyEnv } from "../cluster/get-proxy-env";
import kubeAuthProxyCertificateInjectable from "./kube-auth-proxy-certificate.injectable";
import lensK8sProxyPathInjectable from "./lens-k8s-proxy-path.injectable";
import waitUntilPortIsUsedInjectable from "../../common/utils/wait-until-port-is-used/wait-until-port-is-used.injectable";
import getPortFromStreamInjectable from "../utils/get-port-from-stream.injectable";
export interface KubeAuthProxyProcess {
readonly port: number;
readonly apiPrefix: string;
stop: () => void;
}
export interface SpawnKubeAuthProxyArgs {
signal: AbortSignal;
}
export type SpawnKubeAuthProxy = (cluster: Cluster, args: SpawnKubeAuthProxyArgs) => Promise<KubeAuthProxyProcess>;
const startingServeMatcher = "starting to serve on (?<address>.+)";
const startingServeRegex = Object.assign(TypedRegEx(startingServeMatcher, "i"), {
rawMatcher: startingServeMatcher,
});
const spawnKubeAuthProxyInjectable = getInjectable({
id: "spawn-kube-auth-proxy",
instantiate: (di): SpawnKubeAuthProxy => {
const spawn = di.inject(spawnInjectable);
const lensK8sProxyPath = di.inject(lensK8sProxyPathInjectable);
const logger = di.inject(loggerInjectable);
const waitUntilPortIsUsed = di.inject(waitUntilPortIsUsedInjectable);
const getPortFromStream = di.inject(getPortFromStreamInjectable);
return async (...params) => {
const [cluster, { signal }] = params;
const clusterUrl = new URL(cluster.apiUrl);
const apiPrefix = `/${randomBytes(8).toString("hex")}`;
const proxyCert = di.inject(kubeAuthProxyCertificateInjectable, clusterUrl.hostname);
const attemptToStart = async (): Promise<KubeAuthProxyProcess> => {
let port: number | undefined = undefined;
const proxyProcess = spawn(lensK8sProxyPath, [], {
env: {
...process.env,
...getProxyEnv(cluster),
KUBECONFIG: cluster.kubeConfigPath,
KUBECONFIG_CONTEXT: cluster.contextName,
API_PREFIX: apiPrefix,
PROXY_KEY: proxyCert.private,
PROXY_CERT: proxyCert.cert,
},
signal,
});
const stopProxyProcess = () => {
logger.debug("[KUBE-AUTH]: stopping local proxy", cluster.getMeta());
proxyProcess.removeAllListeners();
proxyProcess.stderr?.removeAllListeners();
proxyProcess.stdout?.removeAllListeners();
proxyProcess.kill();
};
proxyProcess
.on("error", (error) => {
cluster.broadcastConnectUpdate(error.message, true);
stopProxyProcess();
})
.on("exit", (code) => {
cluster.broadcastConnectUpdate(`proxy exited with code: ${code}`, code ? code > 0: false);
stopProxyProcess();
})
.on("disconnect", () => {
cluster.broadcastConnectUpdate("Proxy disconnected communications", true );
stopProxyProcess();
});
proxyProcess.stderr.on("data", (data: Buffer) => {
if (data.includes("http: TLS handshake error")) {
return;
}
cluster.broadcastConnectUpdate(data.toString(), true);
});
proxyProcess.stdout.on("data", (data: Buffer) => {
if (typeof port === "number") {
cluster.broadcastConnectUpdate(data.toString());
}
});
port = await getPortFromStream(proxyProcess.stdout, {
lineRegex: startingServeRegex,
onFind: () => cluster.broadcastConnectUpdate("Authentication proxy started"),
});
logger.info(`[KUBE-AUTH-PROXY]: found port=${port}`);
try {
await waitUntilPortIsUsed(port, 500, 10_000);
} catch (error) {
logger.warn("[KUBE-AUTH-PROXY]: waitUntilUsed failed", error);
cluster.broadcastConnectUpdate("Proxy port failed to be used within timelimit, restarting...", true);
stopProxyProcess();
return attemptToStart();
}
return {
port,
stop: stopProxyProcess,
apiPrefix,
};
};
return attemptToStart();
};
},
causesSideEffects: true,
});
export default spawnKubeAuthProxyInjectable;

View File

@ -32,6 +32,7 @@ const createClusterInjectable = getInjectable({
requestApiResources: ()=> { throw new Error("Tried to access back-end feature in front-end."); },
detectorRegistry: undefined as never,
createVersionDetector: () => { throw new Error("Tried to access back-end feature in front-end."); },
sendClusterConnectUpdate: () => { throw new Error("Tried to access back-end feature in front-end."); },
};
return (model, configData) => new Cluster(dependencies, model, configData);