mirror of
https://github.com/lensapp/lens.git
synced 2025-05-20 05:10:56 +00:00
cluster-store refactoring -- part 3
Signed-off-by: Roman <ixrock@gmail.com>
This commit is contained in:
parent
bde60b1625
commit
f474a20904
@ -3,7 +3,7 @@ import Config from "conf"
|
||||
import { Options as ConfOptions } from "conf/dist/source/types"
|
||||
import produce from "immer";
|
||||
import { app, remote } from "electron"
|
||||
import { observable, reaction, toJS, when } from "mobx";
|
||||
import { action, observable, reaction, toJS, when } from "mobx";
|
||||
import Singleton from "./utils/singleton";
|
||||
import isEqual from "lodash/isEqual"
|
||||
import { getAppVersion } from "./utils/app-version";
|
||||
@ -65,7 +65,6 @@ export class BaseStore<T = any> extends Singleton {
|
||||
configName: configName,
|
||||
watch: syncEnabled, // watch for changes in multi-process app (e.g. main/renderer)
|
||||
get cwd() {
|
||||
// todo: remove remote.app in favor ipc.invoke
|
||||
return (app || remote.app).getPath("userData");
|
||||
},
|
||||
...confOptions,
|
||||
@ -111,11 +110,12 @@ export class BaseStore<T = any> extends Singleton {
|
||||
}
|
||||
}
|
||||
|
||||
// todo: use "serializr" ?
|
||||
@action
|
||||
protected fromStore(data: Partial<T> = {}) {
|
||||
Object.assign(this.data, data);
|
||||
}
|
||||
|
||||
@action
|
||||
merge(updater: (modelDraft: T) => void) {
|
||||
this.data = produce(this.data, updater);
|
||||
}
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
import { action, computed, toJS } from "mobx";
|
||||
import migrations from "../migrations/cluster-store"
|
||||
import { action, observable, toJS } from "mobx";
|
||||
import { v4 as uuid } from "uuid"
|
||||
import { BaseStore } from "./base-store";
|
||||
import { Cluster } from "../main/cluster";
|
||||
import migrations from "../migrations/cluster-store"
|
||||
|
||||
export interface ClusterStoreModel {
|
||||
clusters: ClusterModel[]
|
||||
@ -13,8 +14,8 @@ export interface ClusterModel {
|
||||
id: ClusterId;
|
||||
contextName: string;
|
||||
kubeConfigPath: string;
|
||||
kubeConfig?: string;
|
||||
port?: number;
|
||||
kubeConfig?: string;
|
||||
workspace?: string;
|
||||
preferences?: ClusterPreferences;
|
||||
}
|
||||
@ -36,6 +37,8 @@ export interface ClusterPreferences {
|
||||
}
|
||||
|
||||
export class ClusterStore extends BaseStore<ClusterStoreModel> {
|
||||
@observable clusters = observable.map<ClusterId, Cluster>();
|
||||
|
||||
private constructor() {
|
||||
super({
|
||||
configName: "lens-cluster-store",
|
||||
@ -46,60 +49,51 @@ export class ClusterStore extends BaseStore<ClusterStoreModel> {
|
||||
});
|
||||
}
|
||||
|
||||
// setup initial value
|
||||
protected data: ClusterStoreModel = {
|
||||
clusters: [],
|
||||
getById(id: ClusterId): Cluster {
|
||||
return this.clusters.get(id);
|
||||
}
|
||||
|
||||
@computed get clusters(): Cluster[] {
|
||||
return toJS(this.data.clusters).map(model => new Cluster(model));
|
||||
getByWorkspaceId(workspaceId: string): Cluster[] {
|
||||
return Array.from(this.clusters.values()).filter(cluster => {
|
||||
return cluster.workspace === workspaceId;
|
||||
})
|
||||
}
|
||||
|
||||
@computed get clustersMap(): Map<string, Cluster> {
|
||||
return this.clusters.reduce((map, cluster) => {
|
||||
map.set(cluster.id, cluster);
|
||||
return map;
|
||||
}, new Map);
|
||||
}
|
||||
|
||||
getById(clusterId: ClusterId): Cluster {
|
||||
return this.clusters.find(cluster => cluster.id === clusterId)
|
||||
}
|
||||
|
||||
getIndexById(clusterId: ClusterId): number {
|
||||
return this.clusters.findIndex(cluster => cluster.id === clusterId)
|
||||
@action
|
||||
addCluster(model: ClusterModel): Cluster {
|
||||
const id = model.id || uuid();
|
||||
const cluster = new Cluster({ ...model, id })
|
||||
this.clusters.set(id, cluster);
|
||||
return cluster;
|
||||
}
|
||||
|
||||
@action
|
||||
removeById(clusterId: ClusterId): void {
|
||||
const index = this.getIndexById(clusterId);
|
||||
if (index > -1) {
|
||||
this.data.clusters.splice(index, 1);
|
||||
}
|
||||
this.clusters.delete(clusterId);
|
||||
}
|
||||
|
||||
@action
|
||||
removeAllByWorkspaceId(workspaceId: string) {
|
||||
this.clusters.forEach(cluster => {
|
||||
if (cluster.workspace === workspaceId) {
|
||||
this.removeById(cluster.id)
|
||||
removeByWorkspaceId(workspaceId: string) {
|
||||
this.getByWorkspaceId(workspaceId).forEach(cluster => {
|
||||
this.removeById(cluster.id)
|
||||
})
|
||||
}
|
||||
|
||||
@action
|
||||
protected fromStore({ clusters = [] }: Partial<ClusterStoreModel> = {}) {
|
||||
// fixme: handle clusters update + delete
|
||||
clusters.forEach(model => {
|
||||
if (!this.clusters.has(model.id)) {
|
||||
this.clusters.set(model.id, new Cluster(model));
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
toJSON(): ClusterStoreModel {
|
||||
const clusters: ClusterModel[] = this.clusters.map(cluster => {
|
||||
return {
|
||||
id: cluster.id,
|
||||
contextName: cluster.contextName,
|
||||
kubeConfigPath: cluster.kubeConfigPath,
|
||||
preferences: cluster.preferences,
|
||||
workspace: cluster.workspace,
|
||||
}
|
||||
const clusters = Array.from(this.clusters).map(([id, cluster]) => cluster.toJSON());
|
||||
return toJS({ clusters }, {
|
||||
recurseEverything: true
|
||||
})
|
||||
return {
|
||||
clusters
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -9,16 +9,15 @@ export interface IpcOptions {
|
||||
timeout?: number;
|
||||
}
|
||||
|
||||
export async function sendMessage(channel: string, ...args: any[]) {
|
||||
logger.debug(`[IPC]: invoke "${channel}" with arguments`, args);
|
||||
export async function invokeMessage(channel: string, ...args: any[]) {
|
||||
logger.debug(`[IPC]: invoke channel "${channel}"`, { args });
|
||||
return ipcRenderer.invoke(channel, ...args);
|
||||
}
|
||||
|
||||
// todo: maybe spawn callback in separate thread/worker
|
||||
export function onMessage<T = any>(channel: string, callback: (...args: any[]) => T, options: IpcOptions = {}) {
|
||||
export function onMessage(channel: string, handler: (...args: any[]) => any, options: IpcOptions = {}) {
|
||||
const { timeout = 0 } = options;
|
||||
ipcMain.handle(channel, async (event, ...args: any[]) => {
|
||||
logger.debug(`[IPC]: handle "${channel}"`, event, args);
|
||||
logger.debug(`[IPC]: handle "${channel}"`, { event, args });
|
||||
return new Promise(async (resolve, reject) => {
|
||||
let timerId;
|
||||
if (timeout) {
|
||||
@ -28,7 +27,7 @@ export function onMessage<T = any>(channel: string, callback: (...args: any[]) =
|
||||
}, timeout);
|
||||
}
|
||||
try {
|
||||
const result = await callback(...args);
|
||||
const result = await handler(...args); // todo: maybe exec in separate thread/worker
|
||||
clearTimeout(timerId);
|
||||
return result;
|
||||
} catch (err) {
|
||||
@ -37,3 +36,9 @@ export function onMessage<T = any>(channel: string, callback: (...args: any[]) =
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
export function onMessages(messages: Record<string, Function>, options?: IpcOptions) {
|
||||
Object.entries(messages).forEach(([channel, handler]) => {
|
||||
this.onMessage(channel, handler, options);
|
||||
})
|
||||
}
|
||||
|
||||
15
src/common/ipc-messages.ts
Normal file
15
src/common/ipc-messages.ts
Normal file
@ -0,0 +1,15 @@
|
||||
// IPC messages (all channels)
|
||||
|
||||
export enum ClusterIpcMessage {
|
||||
CLUSTER_ADD = "cluster-add",
|
||||
CLUSTER_STOP = "cluster-stop",
|
||||
CLUSTER_REFRESH = "cluster-refresh",
|
||||
CLUSTER_REMOVE = "cluster-remove",
|
||||
CLUSTER_REMOVE_WORKSPACE = "cluster-remove-all-from-workspace",
|
||||
CLUSTER_EVENTS = "cluster-events-count",
|
||||
FEATURE_INSTALL = "cluster-feature-install",
|
||||
FEATURE_UPGRADE = "cluster-feature-upgrade",
|
||||
FEATURE_REMOVE = "cluster-feature-remove",
|
||||
ICON_SAVE = "cluster-icon-save",
|
||||
ICON_RESET = "cluster-icon-reset",
|
||||
}
|
||||
@ -62,7 +62,7 @@ export class WorkspaceStore extends BaseStore<WorkspaceStoreModel> {
|
||||
const index = this.getIndexById(workspace.id);
|
||||
if (index > -1) {
|
||||
this.data.workspaces.splice(index, 1)
|
||||
clusterStore.removeAllByWorkspaceId(workspace.id)
|
||||
clusterStore.removeByWorkspaceId(workspace.id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,268 +1,88 @@
|
||||
import { KubeConfig } from "@kubernetes/client-node"
|
||||
import { PromiseIpc } from "electron-promise-ipc"
|
||||
import http from "http"
|
||||
import { Cluster } from "./cluster"
|
||||
import { ClusterModel, clusterStore } from "../common/cluster-store"
|
||||
import * as k8s from "./k8s"
|
||||
import logger from "./logger"
|
||||
import { LensProxy } from "./proxy"
|
||||
import { autorun } from "mobx";
|
||||
import { apiPrefix } from "../common/vars";
|
||||
import { app } from "electron"
|
||||
import path from "path"
|
||||
import { promises } from "fs"
|
||||
import { ensureDir } from "fs-extra"
|
||||
import http from "http"
|
||||
import { copyFile, ensureDir } from "fs-extra"
|
||||
import filenamify from "filenamify"
|
||||
import { v4 as uuid } from "uuid"
|
||||
import { apiPrefix } from "../common/vars";
|
||||
import { validateConfig } from "./k8s";
|
||||
import { Cluster } from "./cluster"
|
||||
import { ClusterId, ClusterModel, clusterStore } from "../common/cluster-store"
|
||||
import logger from "./logger"
|
||||
import { onMessages } from "../common/ipc-helpers";
|
||||
import { ClusterIpcMessage } from "../common/ipc-messages";
|
||||
import { FeatureInstallRequest } from "./feature";
|
||||
|
||||
// todo: refactor + reuse parts of cluster-store more heavily
|
||||
|
||||
export type FeatureInstallRequest = {
|
||||
name: string;
|
||||
export interface ClusterIconUpload {
|
||||
clusterId: string;
|
||||
config: any;
|
||||
}
|
||||
|
||||
export type FeatureInstallResponse = {
|
||||
success: boolean;
|
||||
message: string;
|
||||
}
|
||||
|
||||
export type ClusterIconUpload = {
|
||||
name: string;
|
||||
path: string;
|
||||
name: string;
|
||||
clusterId: string;
|
||||
}
|
||||
|
||||
export class ClusterManager {
|
||||
static get clusterIconDir(){
|
||||
return path.join(app.getPath("userData"), "icons")
|
||||
static get clusterIconDir() {
|
||||
return path.join(app.getPath("userData"), "icons");
|
||||
}
|
||||
|
||||
protected promiseIpc: any
|
||||
protected proxyServer: LensProxy
|
||||
protected port: number
|
||||
protected clusters: Map<string, Cluster>;
|
||||
|
||||
constructor(clusters: Cluster[], port: number) {
|
||||
this.promiseIpc = new PromiseIpc({ timeout: 2000 })
|
||||
this.port = port
|
||||
this.clusters = new Map()
|
||||
clusters.forEach((clusterInfo) => {
|
||||
try {
|
||||
const kc = this.loadKubeConfig(clusterInfo.kubeConfigPath)
|
||||
const cluster = new Cluster({
|
||||
id: clusterInfo.id,
|
||||
port: this.port,
|
||||
kubeConfigPath: clusterInfo.kubeConfigPath,
|
||||
contextName: clusterInfo.contextName,
|
||||
preferences: clusterInfo.preferences,
|
||||
workspace: clusterInfo.workspace
|
||||
})
|
||||
cluster.init(kc)
|
||||
logger.debug(`Created cluster[id: ${ cluster.id }] for context ${ cluster.contextName }`)
|
||||
this.clusters.set(cluster.id, cluster)
|
||||
} catch(error) {
|
||||
logger.error(`Error while initializing ${clusterInfo.contextName}`)
|
||||
}
|
||||
});
|
||||
logger.debug("clusters after constructor:" + this.clusters.size)
|
||||
this.listenEvents()
|
||||
}
|
||||
|
||||
public getClusters() {
|
||||
return this.clusters.values()
|
||||
}
|
||||
|
||||
public getCluster(id: string) {
|
||||
return this.clusters.get(id)
|
||||
}
|
||||
|
||||
public stop() {
|
||||
const clusters = Array.from(this.getClusters())
|
||||
clusters.map(cluster => cluster.stopServer())
|
||||
}
|
||||
|
||||
protected loadKubeConfig(configPath: string): KubeConfig {
|
||||
const kc = new KubeConfig();
|
||||
kc.loadFromFile(configPath)
|
||||
return kc;
|
||||
}
|
||||
|
||||
protected async addNewCluster(clusterData: ClusterModel): Promise<Cluster> {
|
||||
return new Promise(async (resolve, reject) => {
|
||||
try {
|
||||
const kc = this.loadKubeConfig(clusterData.kubeConfigPath)
|
||||
k8s.validateConfig(kc)
|
||||
kc.setCurrentContext(clusterData.contextName)
|
||||
const cluster = new Cluster({
|
||||
id: uuid(),
|
||||
port: this.port,
|
||||
kubeConfigPath: clusterData.kubeConfigPath,
|
||||
contextName: clusterData.contextName,
|
||||
preferences: clusterData.preferences,
|
||||
workspace: clusterData.workspace
|
||||
})
|
||||
cluster.init(kc)
|
||||
cluster.save()
|
||||
this.clusters.set(cluster.id, cluster)
|
||||
resolve(cluster)
|
||||
|
||||
} catch(error) {
|
||||
logger.error(error)
|
||||
reject(error)
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected listenEvents() {
|
||||
this.promiseIpc.on("addCluster", async (clusterData: ClusterModel) => {
|
||||
logger.debug(`IPC: addCluster`)
|
||||
const cluster = await this.addNewCluster(clusterData)
|
||||
return {
|
||||
addedCluster: cluster.toClusterInfo(),
|
||||
allClusters: Array.from(this.getClusters()).map((cluster: Cluster) => cluster.toClusterInfo())
|
||||
}
|
||||
});
|
||||
|
||||
this.promiseIpc.on("getClusters", async (workspaceId: string) => {
|
||||
logger.debug(`IPC: getClusters, workspace ${workspaceId}`)
|
||||
const workspaceClusters = Array.from(this.getClusters()).filter((cluster) => cluster.workspace === workspaceId)
|
||||
return workspaceClusters.map((cluster: Cluster) => cluster.toClusterInfo())
|
||||
});
|
||||
|
||||
this.promiseIpc.on("getCluster", async (id: string) => {
|
||||
logger.debug(`IPC: getCluster`)
|
||||
const cluster = this.getCluster(id)
|
||||
if (cluster) {
|
||||
await cluster.refreshCluster()
|
||||
return cluster.toClusterInfo()
|
||||
} else {
|
||||
return null
|
||||
}
|
||||
});
|
||||
|
||||
this.promiseIpc.on("installFeature", async (installReq: FeatureInstallRequest) => {
|
||||
logger.debug(`IPC: installFeature for ${installReq.name}`)
|
||||
const cluster = this.clusters.get(installReq.clusterId)
|
||||
try {
|
||||
await cluster.installFeature(installReq.name, installReq.config)
|
||||
return {success: true, message: ""}
|
||||
} catch(error) {
|
||||
return {success: false, message: error}
|
||||
}
|
||||
});
|
||||
|
||||
this.promiseIpc.on("upgradeFeature", async (installReq: FeatureInstallRequest) => {
|
||||
logger.debug(`IPC: upgradeFeature for ${installReq.name}`)
|
||||
const cluster = this.clusters.get(installReq.clusterId)
|
||||
try {
|
||||
await cluster.upgradeFeature(installReq.name, installReq.config)
|
||||
return {success: true, message: ""}
|
||||
} catch(error) {
|
||||
return {success: false, message: error}
|
||||
}
|
||||
});
|
||||
|
||||
this.promiseIpc.on("uninstallFeature", async (installReq: FeatureInstallRequest) => {
|
||||
logger.debug(`IPC: uninstallFeature for ${installReq.name}`)
|
||||
const cluster = this.clusters.get(installReq.clusterId)
|
||||
|
||||
await cluster.uninstallFeature(installReq.name)
|
||||
return {success: true, message: ""}
|
||||
});
|
||||
|
||||
this.promiseIpc.on("saveClusterIcon", async (fileUpload: ClusterIconUpload) => {
|
||||
logger.debug(`IPC: saveClusterIcon for ${fileUpload.clusterId}`)
|
||||
const cluster = this.getCluster(fileUpload.clusterId)
|
||||
if (!cluster) {
|
||||
return {success: false, message: "Cluster not found"}
|
||||
}
|
||||
try {
|
||||
const clusterIcon = await this.uploadClusterIcon(cluster, fileUpload.name, fileUpload.path)
|
||||
// clusterStore.reloadCluster(cluster);
|
||||
if(!cluster.preferences) cluster.preferences = {};
|
||||
cluster.preferences.icon = clusterIcon
|
||||
// clusterStore.saveCluster(cluster);
|
||||
return {success: true, cluster: cluster.toClusterInfo(), message: ""}
|
||||
} catch(error) {
|
||||
return {success: false, message: error}
|
||||
}
|
||||
});
|
||||
|
||||
this.promiseIpc.on("resetClusterIcon", async (id: string) => {
|
||||
logger.debug(`IPC: resetClusterIcon`)
|
||||
const cluster = this.getCluster(id)
|
||||
if (cluster && cluster.preferences) {
|
||||
cluster.preferences.icon = null;
|
||||
// clusterStore.saveCluster(cluster)
|
||||
return {success: true, cluster: cluster.toClusterInfo(), message: ""}
|
||||
} else {
|
||||
return {success: false, message: "Cluster not found"}
|
||||
}
|
||||
});
|
||||
|
||||
this.promiseIpc.on("refreshCluster", async (clusterId: string) => {
|
||||
const cluster = this.clusters.get(clusterId)
|
||||
await cluster.refreshCluster()
|
||||
return cluster.toClusterInfo()
|
||||
});
|
||||
|
||||
this.promiseIpc.on("stopCluster", (clusterId: string) => {
|
||||
logger.debug(`IPC: stopCluster: ${clusterId}`)
|
||||
const cluster = this.clusters.get(clusterId)
|
||||
if (cluster) {
|
||||
cluster.stopServer()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
});
|
||||
|
||||
this.promiseIpc.on("removeCluster", (ctx: string) => {
|
||||
logger.debug(`IPC: removeCluster: ${ctx}`)
|
||||
return this.removeCluster(ctx).map((cluster: Cluster) => cluster.toClusterInfo())
|
||||
});
|
||||
|
||||
this.promiseIpc.on("clusterStored", (clusterId: string) => {
|
||||
logger.debug(`IPC: clusterStored: ${clusterId}`)
|
||||
const cluster = this.clusters.get(clusterId)
|
||||
if (cluster) {
|
||||
// clusterStore.reloadCluster(cluster);
|
||||
cluster.stopServer()
|
||||
}
|
||||
});
|
||||
|
||||
this.promiseIpc.on("preferencesSaved", () => {
|
||||
logger.debug(`IPC: preferencesSaved`)
|
||||
this.clusters.forEach((cluster) => {
|
||||
cluster.stopServer()
|
||||
constructor(protected port: number) {
|
||||
autorun(() => {
|
||||
clusterStore.clusters.forEach((cluster: Cluster) => {
|
||||
if (!cluster.initialized) {
|
||||
cluster.init(this.port);
|
||||
cluster.refreshCluster();
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
this.promiseIpc.on("getClusterEvents", async (clusterId: string) => {
|
||||
const cluster = this.clusters.get(clusterId)
|
||||
return cluster.getEventCount();
|
||||
});
|
||||
|
||||
this.listenIpcEvents();
|
||||
}
|
||||
|
||||
public removeCluster(id: string): Cluster[] {
|
||||
const cluster = this.clusters.get(id)
|
||||
stop() {
|
||||
clusterStore.clusters.forEach((cluster: Cluster) => {
|
||||
cluster.stopServer();
|
||||
})
|
||||
}
|
||||
|
||||
protected getCluster(id: ClusterId) {
|
||||
return clusterStore.getById(id);
|
||||
}
|
||||
|
||||
protected async addCluster(clusterModel: ClusterModel): Promise<Cluster> {
|
||||
try {
|
||||
await validateConfig(clusterModel.kubeConfigPath);
|
||||
return clusterStore.addCluster({
|
||||
...clusterModel,
|
||||
port: this.port,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error(`[CLUSTER-MANAGER]: add cluster error ${JSON.stringify(error)}`)
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
protected removeAllByWorkspace(workspaceId: string) {
|
||||
const clusters = clusterStore.getByWorkspaceId(workspaceId);
|
||||
clusters.forEach(cluster => {
|
||||
this.removeCluster(cluster.id);
|
||||
});
|
||||
}
|
||||
|
||||
protected removeCluster(clusterId: string): Cluster {
|
||||
const cluster = this.getCluster(clusterId);
|
||||
if (cluster) {
|
||||
cluster.stopServer()
|
||||
clusterStore.removeById(cluster.id);
|
||||
this.clusters.delete(cluster.id)
|
||||
return cluster;
|
||||
}
|
||||
return Array.from(this.clusters.values())
|
||||
}
|
||||
|
||||
public getClusterForRequest(req: http.IncomingMessage): Cluster {
|
||||
getClusterForRequest(req: http.IncomingMessage): Cluster {
|
||||
let cluster: Cluster = null
|
||||
|
||||
// lens-server is connecting to 127.0.0.1:<port>/<uid>
|
||||
if (req.headers.host.startsWith("127.0.0.1")) {
|
||||
const clusterId = req.url.split("/")[1]
|
||||
if (clusterId) {
|
||||
cluster = this.clusters.get(clusterId)
|
||||
cluster = this.getCluster(clusterId)
|
||||
if (cluster) {
|
||||
// we need to swap path prefix so that request is proxied to kube api
|
||||
req.url = req.url.replace(`/${clusterId}`, apiPrefix.KUBE_BASE)
|
||||
@ -270,7 +90,7 @@ export class ClusterManager {
|
||||
}
|
||||
} else {
|
||||
const id = req.headers.host.split(".")[0]
|
||||
cluster = this.clusters.get(id)
|
||||
cluster = this.getCluster(id)
|
||||
}
|
||||
|
||||
return cluster;
|
||||
@ -280,7 +100,54 @@ export class ClusterManager {
|
||||
await ensureDir(ClusterManager.clusterIconDir)
|
||||
fileName = filenamify(cluster.contextName + "-" + fileName)
|
||||
const dest = path.join(ClusterManager.clusterIconDir, fileName)
|
||||
await promises.copyFile(src, dest)
|
||||
await copyFile(src, dest)
|
||||
return "store:///icons/" + fileName
|
||||
}
|
||||
|
||||
protected listenIpcEvents() {
|
||||
onMessages({
|
||||
[ClusterIpcMessage.CLUSTER_ADD]: async (model: ClusterModel): Promise<boolean> => {
|
||||
await this.addCluster(model);
|
||||
return true;
|
||||
},
|
||||
[ClusterIpcMessage.CLUSTER_STOP]: (clusterId: ClusterId) => {
|
||||
this.getCluster(clusterId)?.stopServer();
|
||||
},
|
||||
[ClusterIpcMessage.CLUSTER_REFRESH]: (clusterId: ClusterId) => {
|
||||
this.getCluster(clusterId)?.refreshCluster();
|
||||
},
|
||||
[ClusterIpcMessage.CLUSTER_REMOVE]: (clusterId: ClusterId) => {
|
||||
this.removeCluster(clusterId);
|
||||
},
|
||||
[ClusterIpcMessage.CLUSTER_REMOVE_WORKSPACE]: (workspaceId: ClusterId) => {
|
||||
this.removeAllByWorkspace(workspaceId);
|
||||
},
|
||||
[ClusterIpcMessage.CLUSTER_EVENTS]: async (clusterId: ClusterId): Promise<number> => {
|
||||
return await this.getCluster(clusterId)?.getEventCount() || 0;
|
||||
},
|
||||
// todo: check feature failures
|
||||
[ClusterIpcMessage.FEATURE_INSTALL]: ({ clusterId, name, config }: FeatureInstallRequest) => {
|
||||
return this.getCluster(clusterId)?.installFeature(name, config)
|
||||
},
|
||||
[ClusterIpcMessage.FEATURE_UPGRADE]: ({ clusterId, name, config }: FeatureInstallRequest) => {
|
||||
return this.getCluster(clusterId)?.upgradeFeature(name, config)
|
||||
},
|
||||
[ClusterIpcMessage.FEATURE_REMOVE]: ({ clusterId, name }: FeatureInstallRequest) => {
|
||||
return this.getCluster(clusterId)?.uninstallFeature(name);
|
||||
},
|
||||
[ClusterIpcMessage.ICON_SAVE]: async ({ clusterId, name, path }: ClusterIconUpload) => {
|
||||
const cluster = this.getCluster(clusterId);
|
||||
if (!cluster) return false;
|
||||
cluster.preferences.icon = await this.uploadClusterIcon(cluster, name, path);
|
||||
},
|
||||
// todo: remove current file icon ?
|
||||
[ClusterIpcMessage.ICON_RESET]: async (clusterId: ClusterId) => {
|
||||
const cluster = this.getCluster(clusterId);
|
||||
if (!cluster) return false;
|
||||
cluster.preferences.icon = null;
|
||||
},
|
||||
}, {
|
||||
timeout: 2000,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,15 +1,15 @@
|
||||
import { observable } from "mobx";
|
||||
import { apiPrefix } from "../common/vars";
|
||||
import { ContextHandler } from "./context-handler"
|
||||
import { FeatureStatusMap } from "./feature"
|
||||
import * as k8s from "./k8s"
|
||||
import { ClusterId, ClusterModel, ClusterPreferences } from "../common/cluster-store"
|
||||
import logger from "./logger"
|
||||
import { AuthorizationV1Api, CoreV1Api, KubeConfig, V1ResourceAttributes } from "@kubernetes/client-node"
|
||||
import * as fm from "./feature-manager";
|
||||
import { Kubectl } from "./kubectl";
|
||||
import { KubeconfigManager } from "./kubeconfig-manager"
|
||||
import { getNodeWarningConditions, loadConfig, podHasIssues } from "./k8s"
|
||||
import { getFeatures, installFeature, uninstallFeature, upgradeFeature } from "./feature-manager";
|
||||
import type { ClusterId, ClusterModel, ClusterPreferences } from "../common/cluster-store"
|
||||
import request from "request-promise-native"
|
||||
import { apiPrefix } from "../common/vars";
|
||||
import type { ClusterInfo } from "../renderer/_vue/store/modules/clusters";
|
||||
import logger from "./logger"
|
||||
|
||||
enum ClusterStatus {
|
||||
AccessGranted = 2,
|
||||
@ -18,10 +18,14 @@ enum ClusterStatus {
|
||||
}
|
||||
|
||||
export class Cluster implements ClusterModel {
|
||||
public id: ClusterId;
|
||||
public workspace: string;
|
||||
@observable initialized = false;
|
||||
@observable id: ClusterId;
|
||||
@observable workspace: string;
|
||||
@observable kubeConfigPath: string;
|
||||
@observable contextName: string;
|
||||
@observable preferences: ClusterPreferences = {};
|
||||
|
||||
public contextHandler: ContextHandler;
|
||||
public contextName: string;
|
||||
public url: string;
|
||||
public port: number;
|
||||
public apiUrl: string;
|
||||
@ -32,60 +36,35 @@ export class Cluster implements ClusterModel {
|
||||
public version: string;
|
||||
public distribution: string;
|
||||
public isAdmin: boolean;
|
||||
public features: FeatureStatusMap;
|
||||
public kubeCtl: Kubectl
|
||||
public kubeConfigPath: string;
|
||||
public eventCount: number;
|
||||
public preferences: ClusterPreferences;
|
||||
public kubeCtl: Kubectl
|
||||
public features: FeatureStatusMap = {};
|
||||
|
||||
protected eventPoller: NodeJS.Timeout;
|
||||
protected kubeconfigManager: KubeconfigManager;
|
||||
|
||||
constructor(model: ClusterModel) {
|
||||
if (model) Object.assign(this, model)
|
||||
if (!this.preferences) this.preferences = {}
|
||||
Object.assign(this, model)
|
||||
}
|
||||
|
||||
public proxyKubeconfigPath() {
|
||||
return this.kubeconfigManager.getPath()
|
||||
async init(port: number) {
|
||||
const { contextName } = this
|
||||
try {
|
||||
const kubeConfig = loadConfig(this.kubeConfigPath)
|
||||
kubeConfig.setCurrentContext(contextName); // fixme: is it needed at all?
|
||||
this.port = port;
|
||||
this.apiUrl = kubeConfig.getCurrentCluster().server
|
||||
this.contextHandler = new ContextHandler(kubeConfig, this)
|
||||
await this.contextHandler.init() // So we get the proxy port reserved
|
||||
this.kubeconfigManager = new KubeconfigManager(this)
|
||||
this.url = this.contextHandler.url
|
||||
this.initialized = true;
|
||||
logger.debug(`[CLUSTER]: init done for "${this.id}", context ${contextName}`);
|
||||
} catch (err) {
|
||||
logger.error(`[CLUSTER]: init "${this.id}" has failed`, { err, contextName });
|
||||
}
|
||||
}
|
||||
|
||||
public proxyKubeconfig() {
|
||||
const kc = new KubeConfig()
|
||||
kc.loadFromFile(this.proxyKubeconfigPath())
|
||||
return kc
|
||||
}
|
||||
|
||||
public async init(kc: KubeConfig) {
|
||||
this.apiUrl = kc.getCurrentCluster().server
|
||||
this.contextHandler = new ContextHandler(kc, this)
|
||||
await this.contextHandler.init() // So we get the proxy port reserved
|
||||
this.kubeconfigManager = new KubeconfigManager(this)
|
||||
this.url = this.contextHandler.url
|
||||
}
|
||||
|
||||
public stopServer() {
|
||||
this.contextHandler.stopServer()
|
||||
clearInterval(this.eventPoller);
|
||||
}
|
||||
|
||||
public async installFeature(name: string, config: any) {
|
||||
await fm.installFeature(name, this, config)
|
||||
return this.refreshCluster()
|
||||
}
|
||||
|
||||
public async upgradeFeature(name: string, config: any) {
|
||||
await fm.upgradeFeature(name, this, config)
|
||||
return this.refreshCluster()
|
||||
}
|
||||
|
||||
public async uninstallFeature(name: string) {
|
||||
await fm.uninstallFeature(name, this)
|
||||
return this.refreshCluster()
|
||||
}
|
||||
|
||||
public async refreshCluster() {
|
||||
// clusterStore.reloadCluster(this)
|
||||
async refreshCluster() {
|
||||
this.contextHandler.setClusterPreferences(this.preferences)
|
||||
|
||||
const connectionStatus = await this.getConnectionStatus()
|
||||
@ -94,7 +73,7 @@ export class Cluster implements ClusterModel {
|
||||
|
||||
if (this.accessible) {
|
||||
this.distribution = this.detectKubernetesDistribution(this.version)
|
||||
this.features = await fm.getFeatures(this)
|
||||
this.features = await getFeatures(this)
|
||||
this.isAdmin = await this.isClusterAdmin()
|
||||
this.nodes = await this.getNodeCount()
|
||||
this.kubeCtl = new Kubectl(this.version)
|
||||
@ -103,36 +82,37 @@ export class Cluster implements ClusterModel {
|
||||
this.eventCount = await this.getEventCount();
|
||||
}
|
||||
|
||||
public getPrometheusApiPrefix() {
|
||||
if (!this.preferences.prometheus?.prefix) {
|
||||
return ""
|
||||
}
|
||||
return this.preferences.prometheus.prefix
|
||||
proxyKubeconfigPath() {
|
||||
return this.kubeconfigManager.getPath()
|
||||
}
|
||||
|
||||
public save() {
|
||||
// clusterStore.saveCluster(this)
|
||||
proxyKubeconfig() {
|
||||
const kc = new KubeConfig()
|
||||
kc.loadFromFile(this.proxyKubeconfigPath())
|
||||
return kc
|
||||
}
|
||||
|
||||
public toClusterInfo(): ClusterInfo {
|
||||
return {
|
||||
id: this.id,
|
||||
workspace: this.workspace,
|
||||
url: this.url,
|
||||
contextName: this.contextName,
|
||||
apiUrl: this.apiUrl,
|
||||
online: this.online,
|
||||
accessible: this.accessible,
|
||||
failureReason: this.failureReason,
|
||||
nodes: this.nodes,
|
||||
version: this.version,
|
||||
distribution: this.distribution,
|
||||
isAdmin: this.isAdmin,
|
||||
features: this.features,
|
||||
kubeCtl: this.kubeCtl,
|
||||
kubeConfigPath: this.kubeConfigPath,
|
||||
preferences: this.preferences
|
||||
}
|
||||
stopServer() {
|
||||
this.contextHandler.stopServer()
|
||||
}
|
||||
|
||||
async installFeature(name: string, config: any) {
|
||||
await installFeature(name, this, config)
|
||||
await this.refreshCluster()
|
||||
}
|
||||
|
||||
async upgradeFeature(name: string, config: any) {
|
||||
await upgradeFeature(name, this, config)
|
||||
await this.refreshCluster()
|
||||
}
|
||||
|
||||
async uninstallFeature(name: string) {
|
||||
await uninstallFeature(name, this)
|
||||
await this.refreshCluster()
|
||||
}
|
||||
|
||||
getPrometheusApiPrefix() {
|
||||
return this.preferences.prometheus?.prefix || ""
|
||||
}
|
||||
|
||||
protected async k8sRequest(path: string, opts?: request.RequestPromiseOptions) {
|
||||
@ -140,7 +120,9 @@ export class Cluster implements ClusterModel {
|
||||
json: true,
|
||||
timeout: 10000
|
||||
}, (opts || {}))
|
||||
if (!options.headers) { options.headers = {} }
|
||||
if (!options.headers) {
|
||||
options.headers = {}
|
||||
}
|
||||
options.headers.host = `${this.id}.localhost:${this.port}`
|
||||
return request(`http://127.0.0.1:${this.port}${apiPrefix.KUBE_BASE}${path}`, options)
|
||||
}
|
||||
@ -157,18 +139,15 @@ export class Cluster implements ClusterModel {
|
||||
if (error.statusCode >= 400 && error.statusCode < 500) {
|
||||
this.failureReason = "Invalid credentials";
|
||||
return ClusterStatus.AccessDenied;
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
this.failureReason = error.error || error.message;
|
||||
return ClusterStatus.Offline;
|
||||
}
|
||||
}
|
||||
else if (error.failed === true) {
|
||||
} else if (error.failed === true) {
|
||||
if (error.timedOut === true) {
|
||||
this.failureReason = "Connection timed out";
|
||||
return ClusterStatus.Offline;
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
this.failureReason = "Failed to fetch credentials";
|
||||
return ClusterStatus.AccessDenied;
|
||||
}
|
||||
@ -178,7 +157,7 @@ export class Cluster implements ClusterModel {
|
||||
}
|
||||
}
|
||||
|
||||
public async canI(resourceAttributes: V1ResourceAttributes): Promise<boolean> {
|
||||
async canI(resourceAttributes: V1ResourceAttributes): Promise<boolean> {
|
||||
const authApi = this.proxyKubeconfig().makeApiClient(AuthorizationV1Api)
|
||||
try {
|
||||
const accessReview = await authApi.createSelfSubjectAccessReview({
|
||||
@ -193,7 +172,7 @@ export class Cluster implements ClusterModel {
|
||||
}
|
||||
}
|
||||
|
||||
protected async isClusterAdmin(): Promise<boolean> {
|
||||
async isClusterAdmin(): Promise<boolean> {
|
||||
return this.canI({
|
||||
namespace: "kube-system",
|
||||
resource: "*",
|
||||
@ -202,28 +181,14 @@ export class Cluster implements ClusterModel {
|
||||
}
|
||||
|
||||
protected detectKubernetesDistribution(kubernetesVersion: string): string {
|
||||
if (kubernetesVersion.includes("gke")) {
|
||||
return "gke"
|
||||
}
|
||||
else if (kubernetesVersion.includes("eks")) {
|
||||
return "eks"
|
||||
}
|
||||
else if (kubernetesVersion.includes("IKS")) {
|
||||
return "iks"
|
||||
}
|
||||
else if (this.apiUrl.endsWith("azmk8s.io")) {
|
||||
return "aks"
|
||||
}
|
||||
else if (this.apiUrl.endsWith("k8s.ondigitalocean.com")) {
|
||||
return "digitalocean"
|
||||
}
|
||||
else if (this.contextHandler.contextName.startsWith("minikube")) {
|
||||
return "minikube"
|
||||
}
|
||||
else if (kubernetesVersion.includes("+")) {
|
||||
return "custom"
|
||||
}
|
||||
|
||||
const { apiUrl, contextName } = this
|
||||
if (kubernetesVersion.includes("gke")) return "gke"
|
||||
if (kubernetesVersion.includes("eks")) return "eks"
|
||||
if (kubernetesVersion.includes("IKS")) return "iks"
|
||||
if (apiUrl.endsWith("azmk8s.io")) return "aks"
|
||||
if (apiUrl.endsWith("k8s.ondigitalocean.com")) return "digitalocean"
|
||||
if (contextName.startsWith("minikube")) return "minikube"
|
||||
if (kubernetesVersion.includes("+")) return "custom"
|
||||
return "vanilla"
|
||||
}
|
||||
|
||||
@ -237,7 +202,7 @@ export class Cluster implements ClusterModel {
|
||||
}
|
||||
}
|
||||
|
||||
public async getEventCount(): Promise<number> {
|
||||
async getEventCount(): Promise<number> {
|
||||
if (!this.isAdmin) {
|
||||
return 0;
|
||||
}
|
||||
@ -251,20 +216,19 @@ export class Cluster implements ClusterModel {
|
||||
try {
|
||||
const pod = (await client.readNamespacedPod(w.involvedObject.name, w.involvedObject.namespace)).body;
|
||||
logger.debug(`checking pod ${w.involvedObject.namespace}/${w.involvedObject.name}`)
|
||||
if (k8s.podHasIssues(pod)) {
|
||||
if (podHasIssues(pod)) {
|
||||
uniqEventSources.add(w.involvedObject.uid);
|
||||
}
|
||||
} catch (err) {
|
||||
}
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
uniqEventSources.add(w.involvedObject.uid);
|
||||
}
|
||||
}
|
||||
let nodeNotificationCount = 0;
|
||||
const nodes = (await client.listNode()).body.items;
|
||||
nodes.map(n => {
|
||||
nodeNotificationCount = nodeNotificationCount + k8s.getNodeWarningConditions(n).length
|
||||
nodeNotificationCount = nodeNotificationCount + getNodeWarningConditions(n).length
|
||||
});
|
||||
return uniqEventSources.size + nodeNotificationCount;
|
||||
} catch (error) {
|
||||
@ -272,4 +236,14 @@ export class Cluster implements ClusterModel {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
toJSON(): ClusterModel {
|
||||
return {
|
||||
id: this.id,
|
||||
contextName: this.contextName,
|
||||
kubeConfigPath: this.kubeConfigPath,
|
||||
workspace: this.workspace,
|
||||
preferences: this.preferences,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,11 +1,22 @@
|
||||
import fs from "fs";
|
||||
import path from "path"
|
||||
import * as hb from "handlebars"
|
||||
import hb from "handlebars"
|
||||
import { ResourceApplier } from "./resource-applier"
|
||||
import { KubeConfig, CoreV1Api, Watch } from "@kubernetes/client-node"
|
||||
import logger from "./logger";
|
||||
import { Cluster } from "./cluster";
|
||||
|
||||
export type FeatureInstallRequest = {
|
||||
clusterId: string;
|
||||
name: string;
|
||||
config?: any;
|
||||
}
|
||||
|
||||
export type FeatureInstallResponse = {
|
||||
success: boolean;
|
||||
message: string;
|
||||
}
|
||||
|
||||
export type FeatureStatus = {
|
||||
currentVersion: string;
|
||||
installed: boolean;
|
||||
|
||||
@ -78,7 +78,8 @@ async function main() {
|
||||
]);
|
||||
|
||||
// create cluster manager
|
||||
clusterManager = new ClusterManager(clusterStore.clusters, port)
|
||||
clusterManager = new ClusterManager(port)
|
||||
|
||||
// run proxy
|
||||
try {
|
||||
proxyServer = proxy.listen(port, clusterManager)
|
||||
|
||||
@ -1,10 +1,8 @@
|
||||
import * as k8s from "@kubernetes/client-node"
|
||||
import * as os from "os"
|
||||
import * as yaml from "js-yaml"
|
||||
import k8s from "@kubernetes/client-node"
|
||||
import os from "os"
|
||||
import yaml from "js-yaml"
|
||||
import logger from "./logger";
|
||||
|
||||
const kc = new k8s.KubeConfig()
|
||||
|
||||
function resolveTilde(filePath: string) {
|
||||
if (filePath[0] === "~" && (filePath[1] === "/" || filePath.length === 1)) {
|
||||
return filePath.replace("~", os.homedir());
|
||||
@ -12,9 +10,10 @@ function resolveTilde(filePath: string) {
|
||||
return filePath;
|
||||
}
|
||||
|
||||
export function loadConfig(kubeconfig: string): k8s.KubeConfig {
|
||||
if (kubeconfig) {
|
||||
kc.loadFromFile(resolveTilde(kubeconfig))
|
||||
export function loadConfig(kubeConfigPath?: string): k8s.KubeConfig {
|
||||
const kc = new k8s.KubeConfig()
|
||||
if (kubeConfigPath) {
|
||||
kc.loadFromFile(resolveTilde(kubeConfigPath))
|
||||
} else {
|
||||
kc.loadFromDefault();
|
||||
}
|
||||
@ -22,28 +21,29 @@ export function loadConfig(kubeconfig: string): k8s.KubeConfig {
|
||||
}
|
||||
|
||||
/**
|
||||
* KubeConfig is valid when there's atleast one of each defined:
|
||||
* KubeConfig is valid when there's at least one of each defined:
|
||||
* - User
|
||||
* - Cluster
|
||||
* - Context
|
||||
*
|
||||
* @param config KubeConfig to check
|
||||
*/
|
||||
export function validateConfig(config: k8s.KubeConfig): boolean {
|
||||
export function validateConfig(config: k8s.KubeConfig | string): k8s.KubeConfig {
|
||||
if(typeof config == "string") {
|
||||
config = loadConfig(config);
|
||||
}
|
||||
|
||||
logger.debug(`validating kube config: ${JSON.stringify(config)}`)
|
||||
if(!config.users || config.users.length == 0) {
|
||||
throw new Error("No users provided in config")
|
||||
}
|
||||
|
||||
if(!config.clusters || config.clusters.length == 0) {
|
||||
throw new Error("No clusters provided in config")
|
||||
}
|
||||
|
||||
if(!config.contexts || config.contexts.length == 0) {
|
||||
throw new Error("No contexts provided in config")
|
||||
}
|
||||
|
||||
return true
|
||||
return config
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -29,7 +29,6 @@ Vue.mixin({
|
||||
}
|
||||
})
|
||||
|
||||
// any initialization we want to do for app state
|
||||
setTimeout(async () => {
|
||||
await Promise.all([
|
||||
userStore.whenLoaded,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user