1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

ipc refactoring, push-events handling

Signed-off-by: Roman <ixrock@gmail.com>
This commit is contained in:
Roman 2020-07-16 14:47:05 +03:00
parent ae9afc2894
commit dec9d92032
7 changed files with 141 additions and 66 deletions

View File

@ -6,7 +6,7 @@ import { action, observable, reaction, toJS, when } from "mobx";
import Singleton from "./utils/singleton";
import { getAppVersion } from "./utils/app-version";
import logger from "../main/logger";
import { broadcastMessage } from "./ipc-helpers";
import { sendMessage } from "./ipc";
import isEqual from "lodash/isEqual";
export interface BaseStoreParams<T = any> extends ConfOptions<T> {
@ -110,7 +110,7 @@ export class BaseStore<T = any> extends Singleton {
protected async onModelChange(model: T) {
if (ipcMain) {
this.save(model); // save config file
broadcastMessage({ channel: this.syncChannel }, model); // broadcast to renderer views
sendMessage({ channel: this.syncChannel, args: model }); // broadcast to renderer views
}
// send "update-request" to main-process
if (ipcRenderer) {

View File

@ -1,3 +1,4 @@
import type { WorkspaceId } from "./workspace-store";
import { action, computed, observable, toJS } from "mobx";
import { v4 as uuid } from "uuid"
import { BaseStore } from "./base-store";
@ -13,7 +14,7 @@ export type ClusterId = string;
export interface ClusterModel {
id: ClusterId;
workspace?: string;
workspace?: WorkspaceId;
contextName?: string;
preferences?: ClusterPreferences;
kubeConfigPath: string;

View File

@ -7,7 +7,7 @@ import logger from "../main/logger";
export type IpcChannel = string;
export interface IpcMessageOptions {
export interface IpcHandleOpts {
timeout?: number;
}
@ -15,29 +15,35 @@ export interface IpcMessageHandler<T extends any[] = any> {
(...args: T): any;
}
export interface IpcBroadcastOpts {
export interface IpcMessageOpts<A extends any[] = any> {
channel: IpcChannel
webContentId?: number; // sends to single webContents view
filter?: (webContent: WebContents) => boolean
timeout?: number; // fixme: support
args?: A;
}
export function broadcastMessage({ channel, filter }: IpcBroadcastOpts, ...args: any[]) {
if (!filter) {
filter = webContent => webContent.getType() === "window"
export function sendMessage({ channel, webContentId, filter, args = [] }: IpcMessageOpts) {
const singleView = webContentId ? webContents.fromId(webContentId) : null;
let views = singleView ? [singleView] : webContents.getAllWebContents();
if (filter) {
views = views.filter(filter);
}
webContents.getAllWebContents().filter(filter).forEach(webContent => {
logger.debug(`[IPC]: broadcasting ${channel} to ${webContent.getType()}=${webContent.id}`);
webContent.send(channel, ...args);
views.forEach(webContent => {
const type = webContent.getType();
logger.info(`[IPC]: sending message "${channel}" to webContentId(${type})=${webContent.id}`);
webContent.send(channel, ...[args].flat());
})
}
// todo: support timeout
export async function invokeMessage<T = any>(channel: IpcChannel, ...args: any[]): Promise<T> {
logger.debug(`[IPC]: invoke channel "${channel}"`, { args });
// todo: support timeout + merge with sendMessage?
export async function invokeMessage<T extends any[], R = any>(channel: IpcChannel, ...args: T): Promise<R> {
logger.debug(`[IPC]: invoke channel "${channel}"`, args);
return ipcRenderer.invoke(channel, ...args);
}
// todo: make isomorphic api
export function handleMessage<T extends any[]>(channel: IpcChannel, handler: IpcMessageHandler<T>, options: IpcMessageOptions = {}) {
export function handleMessage<T extends any[]>(channel: IpcChannel, handler: IpcMessageHandler<T>, options: IpcHandleOpts = {}) {
const { timeout = 0 } = options;
ipcMain.handle(channel, async (event, ...args: T) => {
logger.info(`[IPC]: handle "${channel}"`, { event, args });
@ -60,7 +66,7 @@ export function handleMessage<T extends any[]>(channel: IpcChannel, handler: Ipc
})
}
export function handleMessages(messages: Record<string, IpcMessageHandler>, options?: IpcMessageOptions) {
export function handleMessages(messages: Record<string, IpcMessageHandler>, options?: IpcHandleOpts) {
Object.entries(messages).forEach(([channel, handler]) => {
handleMessage(channel, handler, options);
})

View File

@ -6,7 +6,7 @@ import { copyFile, ensureDir } from "fs-extra"
import filenamify from "filenamify"
import { apiKubePrefix, appProto } from "../common/vars";
import { ClusterId, ClusterModel, clusterStore } from "../common/cluster-store"
import { handleMessages } from "../common/ipc-helpers";
import { handleMessages } from "../common/ipc";
import { ClusterIpcMessage } from "../common/ipc-messages";
import { tracker } from "../common/tracker";
import { validateConfig } from "./k8s";
@ -28,22 +28,14 @@ export class ClusterManager {
constructor(public readonly port: number) {
// auto-init clusters
autorun(() => {
const freshClusters = clusterStore.clustersList.filter(cluster => !cluster.initialized);
freshClusters.forEach(cluster => cluster.init(port));
clusterStore.clustersList
.filter(cluster => !cluster.initialized)
.forEach(cluster => cluster.init(port));
});
// auto-stop removed clusters
autorun(() => {
const removedClusters = clusterStore.removedClusters;
if (removedClusters.size > 0) {
removedClusters.forEach(cluster => cluster.stop());
removedClusters.clear();
}
});
// auto-refresh status for active cluster
autorun(() => {
if (clusterStore.activeCluster) {
clusterStore.activeCluster.refreshStatus();
}
clusterStore.removedClusters.forEach(cluster => cluster.stop());
clusterStore.removedClusters.clear();
});
// listen ipc-events
ClusterManager.ipcListen(this);

View File

@ -1,7 +1,9 @@
import type { ClusterId, ClusterModel, ClusterPreferences } from "../common/cluster-store"
import type { FeatureStatusMap } from "./feature"
import { action, observable, toJS, when } from "mobx";
import type { WorkspaceId } from "../common/workspace-store";
import { action, observable, reaction, toJS, when } from "mobx";
import { apiKubePrefix } from "../common/vars";
import { sendMessage } from "../common/ipc";
import { ContextHandler } from "./context-handler"
import { AuthorizationV1Api, CoreV1Api, KubeConfig, V1ResourceAttributes } from "@kubernetes/client-node"
import { Kubectl } from "./kubectl";
@ -11,14 +13,25 @@ import { getFeatures, installFeature, uninstallFeature, upgradeFeature } from ".
import request, { RequestPromiseOptions } from "request-promise-native"
import logger from "./logger"
// fixme: push cluster-state/status info to views on change
enum ClusterStatus {
AccessGranted = 2,
AccessDenied = 1,
Offline = 0
}
export interface ClusterState extends ClusterModel {
apiUrl: string;
online?: boolean;
accessible?: boolean;
failureReason?: string;
nodes?: number;
eventCount?: number;
version?: string;
distribution?: string;
isAdmin?: boolean;
features?: FeatureStatusMap;
}
export class Cluster implements ClusterModel {
public id: ClusterId;
public kubeCtl: Kubectl
@ -26,10 +39,11 @@ export class Cluster implements ClusterModel {
protected kubeconfigManager: KubeconfigManager;
public whenReady = when(() => this.initialized);
protected disposers: CallableFunction[] = [];
@observable initialized = false;
@observable contextName: string;
@observable workspace: string;
@observable workspace: WorkspaceId;
@observable kubeConfigPath: string;
@observable apiUrl: string; // cluster server url
@observable kubeProxyUrl: string; // lens-proxy to kube-api url
@ -41,7 +55,7 @@ export class Cluster implements ClusterModel {
@observable version: string;
@observable distribution = "unknown";
@observable isAdmin = false;
@observable eventCount = 0; // todo: auto-fetch every 3s and push updates to client (?)
@observable eventCount = 0;
@observable preferences: ClusterPreferences = {};
@observable features: FeatureStatusMap = {};
@ -78,10 +92,37 @@ export class Cluster implements ClusterModel {
}
}
bindEvents(viewId: number) {
if (!this.initialized) return;
const refreshStatusTimer = setInterval(() => this.refreshStatus(), 30000); // every 30s
const refreshEventsTimer = setInterval(() => this.refreshEvents(), 3000); // every 3s
this.disposers.push(
() => clearTimeout(refreshStatusTimer),
() => clearTimeout(refreshEventsTimer),
reaction(() => this.getState(), clusterState => {
sendMessage({
channel: "cluster:state",
webContentId: viewId,
args: clusterState,
})
}, {
fireImmediately: true
})
);
}
unbindEvents() {
this.disposers.forEach(dispose => dispose());
this.disposers.length = 0;
}
stop() {
if (!this.initialized) return;
this.contextHandler.stopServer();
this.kubeconfigManager.unlink();
this.unbindEvents();
}
@action
@ -98,6 +139,11 @@ export class Cluster implements ClusterModel {
this.kubeCtl = new Kubectl(this.version)
this.kubeCtl.ensureKubectl()
}
await this.refreshEvents();
}
@action
async refreshEvents() {
this.eventCount = await this.getEventCount();
}
@ -255,14 +301,32 @@ export class Cluster implements ClusterModel {
}
toJSON(): ClusterModel {
return toJS({
const model: ClusterModel = {
id: this.id,
contextName: this.contextName,
kubeConfigPath: this.kubeConfigPath,
workspace: this.workspace,
preferences: this.preferences,
}, {
};
return toJS(model, {
recurseEverything: true
})
}
// serializable full-featured state of the cluster
getState(): ClusterState {
return {
...this.toJSON(),
apiUrl: this.apiUrl,
online: this.online,
accessible: this.accessible,
failureReason: this.failureReason,
nodes: this.nodes,
version: this.version,
distribution: this.distribution,
isAdmin: this.isAdmin,
features: this.features,
eventCount: this.eventCount,
}
}
}

View File

@ -1,6 +1,6 @@
import { ChildProcess, spawn } from "child_process"
import { waitUntilUsed } from "tcp-port-used";
import { broadcastMessage } from "../common/ipc-helpers";
import { sendMessage } from "../common/ipc";
import type { Cluster } from "./cluster"
import { bundledKubectl, Kubectl } from "./kubectl"
import logger from "./logger"
@ -84,7 +84,7 @@ export class KubeAuthProxy {
const channel = `kube-auth:${this.cluster.id}`
const message = { data, stream };
logger.debug(channel, message);
broadcastMessage({ channel }, message); // todo: send message only to cluster's window
sendMessage({ channel, args: message }); // todo: send message only to cluster's window
}
public exit() {

View File

@ -1,15 +1,14 @@
import path from "path";
import { reaction } from "mobx";
import { BrowserWindow, shell } from "electron"
import windowStateKeeper from "electron-window-state"
import type { ClusterId } from "../common/cluster-store";
import { clusterStore } from "../common/cluster-store";
import logger from "./logger";
import { appName } from "../common/vars";
// fixme: remove switching view delay on first load
export class WindowManager {
protected activeClusterId: ClusterId;
protected activeView: BrowserWindow;
protected views = new Map<ClusterId, BrowserWindow>();
protected disposers: CallableFunction[] = [];
@ -33,39 +32,51 @@ export class WindowManager {
defaultWidth: 1440,
});
// init events and show active cluster view
this.bindEvents();
// handle initial view load without clusters
if (!clusterStore.clusters.size) {
this.initNoClustersView();
}
}
// fixme: first run without clusters
protected async initNoClustersView() {
const htmlView = path.join(__dirname, `${appName}.html`);
const view = this.initView(undefined);
await view.loadFile(htmlView);
view.show();
this.hideSplash();
}
protected bindEvents() {
// Manage reactive state
this.disposers.push(
// auto-show active cluster window and subscribe for push-events
reaction(() => clusterStore.activeCluster, async activeCluster => {
if (this.activeClusterId) {
const prevCluster = clusterStore.getById(this.activeClusterId);
if (prevCluster) prevCluster.unbindEvents();
this.activeClusterId = null;
}
if (activeCluster) {
this.activeClusterId = activeCluster.id;
const viewId = await this.activateView(activeCluster.id);
if (viewId) {
await activeCluster.refreshStatus();
activeCluster.bindEvents(viewId);
}
}
}, {
fireImmediately: true,
delay: 250,
}),
// auto-destroy views for removed clusters
reaction(() => clusterStore.removedClusters.toJS(), removedClusters => {
removedClusters.forEach(cluster => {
this.destroyView(cluster.id);
});
}),
// auto-show active cluster view
reaction(() => clusterStore.activeClusterId, clusterId => this.activateView(clusterId), {
fireImmediately: true,
})
)
);
// handle initial view load without clusters
// if (!clusterStore.clusters.size) {
// this.initNoClustersView();
// }
}
// fixme: first run without clusters
// protected async initNoClustersView() {
// const htmlView = path.join(__dirname, `${appName}.html`);
// const view = this.initView(undefined);
// await view.loadFile(htmlView);
// view.show();
// this.hideSplash();
// }
async showSplash() {
await this.splashWindow.loadURL("static://splash.html")
this.splashWindow.show();
@ -79,7 +90,7 @@ export class WindowManager {
return this.views.get(clusterId);
}
async activateView(clusterId: ClusterId) {
async activateView(clusterId: ClusterId): Promise<number> {
const cluster = clusterStore.getById(clusterId);
if (!cluster) {
return;
@ -107,6 +118,7 @@ export class WindowManager {
activeView.hide();
}
view.show();
return view.id;
}
} catch (err) {
logger.error(`[WINDOW-MANAGER]: can't activate cluster view`, {