diff --git a/src/common/catalog/catalog-entity.ts b/src/common/catalog/catalog-entity.ts index 7f5f8bbe73..a50d5c43e6 100644 --- a/src/common/catalog/catalog-entity.ts +++ b/src/common/catalog/catalog-entity.ts @@ -207,7 +207,7 @@ export interface CatalogEntityMetadata { description?: string; source?: string; labels: Record; - [key: string]: string | object; + [key: string]: string | Record; } export interface CatalogEntityStatus { @@ -220,6 +220,7 @@ export interface CatalogEntityStatus { enabled?: boolean; message?: string; active?: boolean; + [key: string]: string | number | boolean; } export interface CatalogEntityActionContext { diff --git a/src/common/catalog/entity-sync.ts b/src/common/catalog/entity-sync.ts new file mode 100644 index 0000000000..f95000b2ea --- /dev/null +++ b/src/common/catalog/entity-sync.ts @@ -0,0 +1,38 @@ +/** + * Copyright (c) OpenLens Authors. All rights reserved. + * Licensed under MIT License. See LICENSE in root directory for more information. + */ + +import type { RequireAtLeastOne } from "type-fest"; +import type { CatalogEntityData } from "./catalog-entity"; + +export interface EntityChangeEvents { + add: (data: RawCatalogEntity) => void; + update: (uid: string, data: RawCatalogEntityUpdate) => void; + delete: (uid: string) => void; +} + +export interface RawCatalogEntity extends CatalogEntityData { + kind: string; + apiVersion: string; +} + +export type RawCatalogEntityUpdate = RequireAtLeastOne; + +export interface CatalogSyncAddMessage { + type: "add"; + data: RawCatalogEntity; +} + +export interface CatalogSyncUpdateMessage { + type: "update", + uid: string; + data: RawCatalogEntityUpdate; +} + +export interface CatalogSyncDeleteMessage { + type: "delete", + uid: string; +} + +export type CatalogSyncMessage = CatalogSyncAddMessage | CatalogSyncUpdateMessage | CatalogSyncDeleteMessage; diff --git a/src/common/ipc/catalog.ts b/src/common/ipc/catalog.ts index 3d316d211c..50d9a46704 100644 --- a/src/common/ipc/catalog.ts +++ b/src/common/ipc/catalog.ts @@ -7,13 +7,3 @@ * This is used to activate a specific entity in the renderer main frame */ export const catalogEntityRunListener = "catalog-entity:run"; - -/** - * This is broadcast on whenever there is an update to any catalog item - */ -export const catalogItemsChannel = "catalog:items"; - -/** - * This can be sent from renderer to main to initialize a broadcast of ITEMS - */ -export const catalogInitChannel = "catalog:init"; diff --git a/src/common/utils/objects.ts b/src/common/utils/objects.ts index 0f213aa5f5..4d97dfb51e 100644 --- a/src/common/utils/objects.ts +++ b/src/common/utils/objects.ts @@ -10,3 +10,7 @@ export function fromEntries(entries: Iterable): { [k in Key]: T } { return Object.fromEntries(entries) as { [k in Key]: T }; } + +export function entries(obj: Record): [Key, T][] { + return Object.entries(obj) as [Key, T][]; +} diff --git a/src/common/vars.ts b/src/common/vars.ts index abbf09e877..bfdbdcdebd 100644 --- a/src/common/vars.ts +++ b/src/common/vars.ts @@ -54,6 +54,8 @@ defineGlobal("__static", { // Apis export const apiPrefix = "/api" as string; // local router apis export const apiKubePrefix = "/api-kube" as string; // k8s cluster apis +export const shellRoute = "/shell" as string; +export const catalogSyncRoute = "/catalog-sync" as string; // Links export const issuesTrackerUrl = "https://github.com/lensapp/lens/issues" as string; diff --git a/src/main/catalog-pusher.ts b/src/main/catalog-pusher.ts deleted file mode 100644 index 14cf8cc5e9..0000000000 --- a/src/main/catalog-pusher.ts +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Copyright (c) OpenLens Authors. All rights reserved. - * Licensed under MIT License. See LICENSE in root directory for more information. - */ - -import { reaction } from "mobx"; -import { broadcastMessage, ipcMainOn } from "../common/ipc"; -import type { CatalogEntityRegistry } from "./catalog"; -import "../common/catalog-entities/kubernetes-cluster"; -import { disposer, toJS } from "../common/utils"; -import { debounce } from "lodash"; -import type { CatalogEntity } from "../common/catalog"; -import { catalogInitChannel, catalogItemsChannel } from "../common/ipc/catalog"; - -const broadcaster = debounce((items: CatalogEntity[]) => { - broadcastMessage(catalogItemsChannel, items); -}, 1_000, { leading: true, trailing: true }); - -export function pushCatalogToRenderer(catalog: CatalogEntityRegistry) { - return disposer( - ipcMainOn(catalogInitChannel, () => broadcaster(toJS(catalog.items))), - reaction(() => toJS(catalog.items), (items) => { - broadcaster(items); - }, { - fireImmediately: true, - }), - ); -} diff --git a/src/main/catalog/catalog-entity-registry.ts b/src/main/catalog/catalog-entity-registry.ts index 5c6e760052..8b4ee7cf4f 100644 --- a/src/main/catalog/catalog-entity-registry.ts +++ b/src/main/catalog/catalog-entity-registry.ts @@ -26,13 +26,15 @@ export class CatalogEntityRegistry { this.sources.delete(id); } - @computed get items(): CatalogEntity[] { - return Array.from( - iter.filter( - iter.flatMap(this.sources.values(), source => source.get()), - entity => this.categoryRegistry.getCategoryForEntity(entity), - ), - ); + readonly entities = computed(() => Array.from( + iter.filter( + iter.flatMap(this.sources.values(), source => source.get()), + entity => this.categoryRegistry.getCategoryForEntity(entity), + ), + )); + + get items(): CatalogEntity[] { + return this.entities.get(); } getById(id: string): T | undefined { diff --git a/src/main/catalog/entities.injectable.ts b/src/main/catalog/entities.injectable.ts new file mode 100644 index 0000000000..96d55a5f4d --- /dev/null +++ b/src/main/catalog/entities.injectable.ts @@ -0,0 +1,17 @@ +/** + * Copyright (c) OpenLens Authors. All rights reserved. + * Licensed under MIT License. See LICENSE in root directory for more information. + */ +import { getInjectable, lifecycleEnum } from "@ogre-tools/injectable"; +import catalogEntityRegistryInjectable from "./entity-registry.injectable"; + +const catalogEntitiesInjectable = getInjectable({ + instantiate: (di) => { + const registry = di.inject(catalogEntityRegistryInjectable); + + return registry.entities; + }, + lifecycle: lifecycleEnum.singleton, +}); + +export default catalogEntitiesInjectable; diff --git a/src/main/catalog/entity-registry.injectable.ts b/src/main/catalog/entity-registry.injectable.ts new file mode 100644 index 0000000000..c6c2db61c3 --- /dev/null +++ b/src/main/catalog/entity-registry.injectable.ts @@ -0,0 +1,13 @@ +/** + * Copyright (c) OpenLens Authors. All rights reserved. + * Licensed under MIT License. See LICENSE in root directory for more information. + */ +import { getInjectable, lifecycleEnum } from "@ogre-tools/injectable"; +import { catalogEntityRegistry } from "./catalog-entity-registry"; + +const catalogEntityRegistryInjectable = getInjectable({ + instantiate: () => catalogEntityRegistry, + lifecycle: lifecycleEnum.singleton, +}); + +export default catalogEntityRegistryInjectable; diff --git a/src/main/index.ts b/src/main/index.ts index 473a92c370..522820ef95 100644 --- a/src/main/index.ts +++ b/src/main/index.ts @@ -26,8 +26,6 @@ import { disposer, getAppVersion, getAppVersionFromProxyServer } from "../common import { ipcMainOn } from "../common/ipc"; import { startUpdateChecking } from "./app-updater"; import { IpcRendererNavigationEvents } from "../renderer/navigation/events"; -import { pushCatalogToRenderer } from "./catalog-pusher"; -import { catalogEntityRegistry } from "./catalog"; import { HelmRepoManager } from "./helm/helm-repo-manager"; import { syncGeneralEntities, syncWeblinks } from "./catalog-sources"; import configurePackages from "../common/configure-packages"; @@ -55,6 +53,7 @@ import routerInjectable from "./router/router.injectable"; import shellApiRequestInjectable from "./proxy-functions/shell-api-request/shell-api-request.injectable"; import userStoreInjectable from "../common/user-store/user-store.injectable"; import trayMenuItemsInjectable from "./tray/tray-menu-items.injectable"; +import catalogApiRequestHandlerInjectable from "./proxy-functions/catalog-api-request/handler.injectable"; const di = getDi(); @@ -63,7 +62,6 @@ app.setName(appName); di.runSetups().then(() => { injectSystemCAs(); - const onCloseCleanup = disposer(); const onQuitCleanup = disposer(); SentryInit(); @@ -163,11 +161,13 @@ di.runSetups().then(() => { const router = di.inject(routerInjectable); const shellApiRequest = di.inject(shellApiRequestInjectable); + const catalogApiRequest = di.inject(catalogApiRequestHandlerInjectable); const lensProxy = LensProxy.createInstance(router, { getClusterForRequest: (req) => ClusterManager.getInstance().getClusterForRequest(req), kubeApiRequest, shellApiRequest, + catalogApiRequest, }); ClusterManager.createInstance().init(); @@ -244,8 +244,6 @@ di.runSetups().then(() => { } ipcMainOn(IpcRendererNavigationEvents.LOADED, async () => { - onCloseCleanup.push(pushCatalogToRenderer(catalogEntityRegistry)); - const directoryForKubeConfigs = di.inject(directoryForKubeConfigsInjectable); await ensureDir(directoryForKubeConfigs); @@ -320,8 +318,6 @@ di.runSetups().then(() => { kubeConfigSyncManager.stopSync(); - onCloseCleanup(); - // This is set to false here so that LPRM can wait to send future lens:// // requests until after it loads again lensProtocolRouterMain.rendererLoaded = false; diff --git a/src/main/initializers/init-ipc-main-handlers/init-ipc-main-handlers.ts b/src/main/initializers/init-ipc-main-handlers/init-ipc-main-handlers.ts index 2986d3319e..c58bc6ab46 100644 --- a/src/main/initializers/init-ipc-main-handlers/init-ipc-main-handlers.ts +++ b/src/main/initializers/init-ipc-main-handlers/init-ipc-main-handlers.ts @@ -10,8 +10,6 @@ import type { ClusterId } from "../../../common/cluster-types"; import { ClusterStore } from "../../../common/cluster-store/cluster-store"; import { appEventBus } from "../../../common/app-event-bus/event-bus"; import { broadcastMainChannel, broadcastMessage, ipcMainHandle, ipcMainOn } from "../../../common/ipc"; -import { catalogEntityRegistry } from "../../catalog"; -import { pushCatalogToRenderer } from "../../catalog-pusher"; import { ClusterManager } from "../../cluster-manager"; import { ResourceApplier } from "../../resource-applier"; import { WindowManager } from "../../window-manager"; @@ -43,8 +41,6 @@ export const initIpcMainHandlers = ({ electronMenuItems, directoryForLensLocalSt if (cluster) { clusterFrameMap.set(cluster.id, { frameId: event.frameId, processId: event.processId }); cluster.pushState(); - - pushCatalogToRenderer(catalogEntityRegistry); } }); diff --git a/src/main/lens-proxy.ts b/src/main/lens-proxy.ts index 3a1c4a829e..170d8fe387 100644 --- a/src/main/lens-proxy.ts +++ b/src/main/lens-proxy.ts @@ -11,18 +11,20 @@ import { apiPrefix, apiKubePrefix } from "../common/vars"; import type { Router } from "./router"; import type { ContextHandler } from "./context-handler/context-handler"; import logger from "./logger"; -import { Singleton } from "../common/utils"; +import { entries, Singleton } from "../common/utils"; import type { Cluster } from "../common/cluster/cluster"; -import type { ProxyApiRequestArgs } from "./proxy-functions"; +import type { ClusterProxyApiRequestArgs, KubeApiRequestArgs, ProxyApiRequestArgs } from "./proxy-functions"; import { appEventBus } from "../common/app-event-bus/event-bus"; import { getBoolean } from "./utils/parse-query"; +import { matchPath } from "react-router"; -type GetClusterForRequest = (req: http.IncomingMessage) => Cluster | null; +export type GetClusterForRequest = (req: http.IncomingMessage) => Cluster | null; export interface LensProxyFunctions { getClusterForRequest: GetClusterForRequest, - shellApiRequest: (args: ProxyApiRequestArgs) => void | Promise; - kubeApiRequest: (args: ProxyApiRequestArgs) => void | Promise; + shellApiRequest: (args: ClusterProxyApiRequestArgs) => void; + catalogApiRequest: (args: ProxyApiRequestArgs) => Promise; + kubeApiRequest: (args: KubeApiRequestArgs) => Promise; } const watchParam = "watch"; @@ -52,6 +54,12 @@ const disallowedPorts = new Set([ 10080, ]); +enum ProxyRouteKey { + KUBE = "kube", + SHELL = "shell", + CATALOG = "catalog", +} + export class LensProxy extends Singleton { protected origin: string; protected proxyServer: http.Server; @@ -62,7 +70,7 @@ export class LensProxy extends Singleton { public port: number; - constructor(protected router: Router, { shellApiRequest, kubeApiRequest, getClusterForRequest }: LensProxyFunctions) { + constructor(protected router: Router, { shellApiRequest, kubeApiRequest, getClusterForRequest, catalogApiRequest }: LensProxyFunctions) { super(); this.getClusterForRequest = getClusterForRequest; @@ -76,19 +84,55 @@ export class LensProxy extends Singleton { this.handleRequest(req, res); }); - this.proxyServer - .on("upgrade", (req: http.IncomingMessage, socket: net.Socket, head: Buffer) => { - const isInternal = req.url.startsWith(`${apiPrefix}?`); + const routes: Record = { + [ProxyRouteKey.KUBE]: `${apiKubePrefix}/:rest*`, + [ProxyRouteKey.CATALOG]: `${apiPrefix}/catalog-sync`, + [ProxyRouteKey.SHELL]: `${apiPrefix}/shell`, + }; + const handlers: Record Promise> = { + [ProxyRouteKey.KUBE]: (req, socket, head, restUrl) => { + req.url = `/${restUrl}`; + const cluster = getClusterForRequest(req); if (!cluster) { return void logger.error(`[LENS-PROXY]: Could not find cluster for upgrade request from url=${req.url}`); } - const reqHandler = isInternal ? shellApiRequest : kubeApiRequest; + return kubeApiRequest({ req, socket, head, cluster, restUrl }); + }, + [ProxyRouteKey.CATALOG]: (req, socket, head) => { + return catalogApiRequest({ req, socket, head }); + }, + [ProxyRouteKey.SHELL]: (req, socket, head) => { + const cluster = getClusterForRequest(req); - (async () => reqHandler({ req, socket, head, cluster }))() - .catch(error => logger.error("[LENS-PROXY]: failed to handle proxy upgrade", error)); + if (!cluster) { + return void logger.error(`[LENS-PROXY]: Could not find cluster for upgrade request from url=${req.url}`); + } + + shellApiRequest({ req, socket, head, cluster }); + }, + }; + + this.proxyServer + .on("upgrade", async (req: http.IncomingMessage, socket: net.Socket, head: Buffer) => { + try { + for (const [type, matcher] of entries(routes)) { + const match = matchPath<{ rest?: string }>(req.url, { + path: matcher, + exact: true, + }); + + if (match) { + return await handlers[type](req, socket, head, match.params.rest); + } + } + + logger.warn(`[LENS-PROXY]: Tried to upgrade request with no matching handler, url=${req.url}`); + } catch (error) { + logger.error("[LENS-PROXY]: failed to handle proxy upgrade", error); + } }); } diff --git a/src/main/proxy-functions/catalog-api-request/handler.injectable.ts b/src/main/proxy-functions/catalog-api-request/handler.injectable.ts new file mode 100644 index 0000000000..78c4845957 --- /dev/null +++ b/src/main/proxy-functions/catalog-api-request/handler.injectable.ts @@ -0,0 +1,147 @@ +/** + * Copyright (c) OpenLens Authors. All rights reserved. + * Licensed under MIT License. See LICENSE in root directory for more information. + */ + +import { getInjectable, lifecycleEnum } from "@ogre-tools/injectable"; +import { isEqual } from "lodash"; +import { autorun, IComputedValue } from "mobx"; +import type { CatalogEntity } from "../../../common/catalog"; +import { toJS } from "../../../renderer/utils"; +import catalogEntitiesInjectable from "../../catalog/entities.injectable"; +import type { ProxyApiRequestArgs } from "../types"; +import WebSocket, { Server as WebSocketServer } from "ws"; +import logger from "../../logger"; +import EventEmitter from "events"; +import type TypedEventEmitter from "typed-emitter"; +import type { RawCatalogEntity, RawCatalogEntityUpdate, EntityChangeEvents, CatalogSyncAddMessage, CatalogSyncDeleteMessage, CatalogSyncUpdateMessage } from "../../../common/catalog/entity-sync"; + +interface Dependencies { + entities: IComputedValue; +} + +function toRaw(entity: CatalogEntity): RawCatalogEntity { + return { + kind: entity.kind, + apiVersion: entity.apiVersion, + metadata: toJS(entity.metadata), + status: toJS(entity.status), + spec: toJS(entity.spec), + }; +} + +function createRawEntityUpdate(prevRaw: RawCatalogEntity, rawEntity: RawCatalogEntity): RawCatalogEntityUpdate | false { + const metadata = isEqual(prevRaw.metadata, rawEntity.metadata) + ? {} + : { metadata: rawEntity.metadata }; + const status = isEqual(prevRaw.status, rawEntity.status) + ? {} + : { status: rawEntity.status }; + const spec = isEqual(prevRaw.spec, rawEntity.spec) + ? {} + : { spec: rawEntity.spec }; + const res = { + ...metadata, + ...status, + ...spec, + }; + + if (!res.metadata && !res.spec && !res.status) { + return false; + } + + return res as RawCatalogEntityUpdate; +} + +function wrapWebsocketForChangeEvents(websocket: WebSocket): EntityChangeEvents { + return { + add: (data) => { + websocket.send(JSON.stringify({ + data, + type: "add", + } as CatalogSyncAddMessage)); + }, + delete: (uid) => { + websocket.send(JSON.stringify({ + uid, + type: "delete", + } as CatalogSyncDeleteMessage)); + }, + update: (uid, data) => { + websocket.send(JSON.stringify({ + uid, + data, + type: "update", + } as CatalogSyncUpdateMessage)); + }, + }; +} + +const catalogApiRequestHandler = ({ entities }: Dependencies) => { + const rawEntityMap = new Map(); + const entityChangeEmitter = new EventEmitter() as TypedEventEmitter; + + autorun(() => { + const currentIds = new Set(); + + for (const entity of entities.get()) { + currentIds.add(entity.getId()); + + const rawEntity = toRaw(entity); + + if (rawEntityMap.has(rawEntity.metadata.uid)) { + const prevRaw = rawEntityMap.get(rawEntity.metadata.uid); + const diff = createRawEntityUpdate(prevRaw, rawEntity); + + if (diff) { + rawEntityMap.set(rawEntity.metadata.uid, rawEntity); + entityChangeEmitter.emit("update", rawEntity.metadata.uid, diff); + } + } else { + rawEntityMap.set(rawEntity.metadata.uid, rawEntity); + entityChangeEmitter.emit("add", rawEntity); + } + } + + for (const rawEntityId of rawEntityMap.keys()) { + if (!currentIds.has(rawEntityId)) { + rawEntityMap.delete(rawEntityId); + entityChangeEmitter.emit("delete", rawEntityId); + } + } + }); + + return async ({ req, socket, head }: ProxyApiRequestArgs): Promise => { + const ws = new WebSocketServer({ noServer: true }); + + return ws.handleUpgrade(req, socket, head, (websocket) => { + logger.info("[CATALOG-SYNC]: starting new catalog entity sync"); + const events = wrapWebsocketForChangeEvents(websocket); + + for (const rawEntity of rawEntityMap.values()) { + // initialize with current values + events.add(rawEntity); + } + + // Set up passing changes on + entityChangeEmitter.on("add", events.add); + entityChangeEmitter.on("update", events.update); + entityChangeEmitter.on("delete", events.delete); + + websocket.on("close", () => { + entityChangeEmitter.off("add", events.add); + entityChangeEmitter.off("update", events.update); + entityChangeEmitter.off("delete", events.delete); + }); + }); + }; +}; + +const catalogApiRequestHandlerInjectable = getInjectable({ + instantiate: (di) => catalogApiRequestHandler({ + entities: di.inject(catalogEntitiesInjectable), + }), + lifecycle: lifecycleEnum.singleton, +}); + +export default catalogApiRequestHandlerInjectable; diff --git a/src/main/proxy-functions/kube-api-request.ts b/src/main/proxy-functions/kube-api-request.ts index 8995d965e5..09152ae036 100644 --- a/src/main/proxy-functions/kube-api-request.ts +++ b/src/main/proxy-functions/kube-api-request.ts @@ -5,16 +5,19 @@ import { chunk } from "lodash"; import net from "net"; -import url from "url"; -import { apiKubePrefix } from "../../common/vars"; -import type { ProxyApiRequestArgs } from "./types"; +import { parse } from "url"; +import type { ClusterProxyApiRequestArgs } from "."; const skipRawHeaders = new Set(["Host", "Authorization"]); -export async function kubeApiRequest({ req, socket, head, cluster }: ProxyApiRequestArgs) { - const proxyUrl = await cluster.contextHandler.resolveAuthProxyUrl() + req.url.replace(apiKubePrefix, ""); - const apiUrl = url.parse(cluster.apiUrl); - const pUrl = url.parse(proxyUrl); +export interface KubeApiRequestArgs extends ClusterProxyApiRequestArgs { + restUrl: string; +} + +export async function kubeApiRequest({ req, socket, head, cluster, restUrl }: KubeApiRequestArgs) { + const proxyUrl = `${await cluster.contextHandler.resolveAuthProxyUrl()}/${restUrl}`; + const apiUrl = parse(cluster.apiUrl); + const pUrl = parse(proxyUrl); const connectOpts = { port: parseInt(pUrl.port), host: pUrl.hostname }; const proxySocket = new net.Socket(); diff --git a/src/main/proxy-functions/shell-api-request/shell-api-request.ts b/src/main/proxy-functions/shell-api-request/shell-api-request.ts index 8fe4d51b09..485f5bbd78 100644 --- a/src/main/proxy-functions/shell-api-request/shell-api-request.ts +++ b/src/main/proxy-functions/shell-api-request/shell-api-request.ts @@ -5,11 +5,10 @@ import logger from "../../logger"; import WebSocket, { Server as WebSocketServer } from "ws"; -import type { ProxyApiRequestArgs } from "../types"; -import { ClusterManager } from "../../cluster-manager"; -import URLParse from "url-parse"; +import type { ClusterProxyApiRequestArgs } from "../types"; import type { Cluster } from "../../../common/cluster/cluster"; import type { ClusterId } from "../../../common/cluster-types"; +import URLParse from "url-parse"; interface Dependencies { authenticateRequest: (clusterId: ClusterId, tabId: string, shellToken: string) => boolean, @@ -22,9 +21,9 @@ interface Dependencies { }) => { open: () => Promise }; } -export const shellApiRequest = ({ createShellSession, authenticateRequest }: Dependencies) => ({ req, socket, head }: ProxyApiRequestArgs): void => { - const cluster = ClusterManager.getInstance().getClusterForRequest(req); - const { query: { node: nodeName, shellToken, id: tabId }} = new URLParse(req.url, true); +export const shellApiRequest = ({ createShellSession, authenticateRequest }: Dependencies) => ({ req, socket, head, cluster }: ClusterProxyApiRequestArgs): void => { + const url = new URLParse(req.url, true); + const { query: { node: nodeName, shellToken, id: tabId }} = url; if (!cluster || !authenticateRequest(cluster.id, tabId, shellToken)) { socket.write("Invalid shell request"); diff --git a/src/main/proxy-functions/types.ts b/src/main/proxy-functions/types.ts index f8ee225bed..8b87221b56 100644 --- a/src/main/proxy-functions/types.ts +++ b/src/main/proxy-functions/types.ts @@ -11,5 +11,8 @@ export interface ProxyApiRequestArgs { req: http.IncomingMessage, socket: net.Socket, head: Buffer, +} + +export interface ClusterProxyApiRequestArgs extends ProxyApiRequestArgs { cluster: Cluster, } diff --git a/src/renderer/api/__tests__/catalog-entity-registry.test.ts b/src/renderer/api/__tests__/catalog-entity-registry.test.ts index a76dfdab82..f4dd5d0887 100644 --- a/src/renderer/api/__tests__/catalog-entity-registry.test.ts +++ b/src/renderer/api/__tests__/catalog-entity-registry.test.ts @@ -7,11 +7,17 @@ import { CatalogEntityRegistry } from "../catalog-entity-registry"; import { catalogCategoryRegistry } from "../../../common/catalog/catalog-category-registry"; import { CatalogCategory, CatalogEntityData, CatalogEntityKindData } from "../catalog-entity"; import { KubernetesCluster, WebLink } from "../../../common/catalog-entities"; -import { observable } from "mobx"; +import { observable, runInAction } from "mobx"; class TestCatalogEntityRegistry extends CatalogEntityRegistry { - replaceItems(items: Array) { - this.updateItems(items); + replaceItems(items: (CatalogEntityData & CatalogEntityKindData)[]) { + runInAction(() => { + this._entities.clear(); + + for (const item of items) { + this.addItem(item); + } + }); } } diff --git a/src/renderer/api/catalog-entity-registry.ts b/src/renderer/api/catalog-entity-registry.ts index cc6af170c8..876ad40ddc 100644 --- a/src/renderer/api/catalog-entity-registry.ts +++ b/src/renderer/api/catalog-entity-registry.ts @@ -13,10 +13,11 @@ import { Disposer, iter } from "../utils"; import { once } from "lodash"; import logger from "../../common/logger"; import { CatalogRunEvent } from "../../common/catalog/catalog-run-event"; -import { ipcRenderer } from "electron"; -import { catalogInitChannel, catalogItemsChannel, catalogEntityRunListener } from "../../common/ipc/catalog"; +import { catalogEntityRunListener } from "../../common/ipc/catalog"; import { navigate } from "../navigation"; import { isMainFrame } from "process"; +import { startCatalogEntitySync } from "./catalog-entity-sync"; +import type { RawCatalogEntity, RawCatalogEntityUpdate } from "../../common/catalog/entity-sync"; export type EntityFilter = (entity: CatalogEntity) => any; export type CatalogEntityOnBeforeRun = (event: CatalogRunEvent) => void | Promise; @@ -41,7 +42,7 @@ export class CatalogEntityRegistry { /** * Buffer for keeping entities that don't yet have CatalogCategory synced */ - protected rawEntities: (CatalogEntityData & CatalogEntityKindData)[] = []; + protected rawEntities = new Map(); constructor(private categoryRegistry: CatalogCategoryRegistry) { makeObservable(this); @@ -57,7 +58,7 @@ export class CatalogEntityRegistry { // If the entity was not found but there are rawEntities to be processed, // try to process them and return the entity. // This might happen if an extension registered a new Catalog category. - if (this.activeEntityId && !entity && this.rawEntities.length > 0) { + if (this.activeEntityId && !entity && this.rawEntities.size > 0) { this.processRawEntities(); return this.getActiveEntityById(); @@ -79,13 +80,12 @@ export class CatalogEntityRegistry { } init() { - ipcRendererOn(catalogItemsChannel, (event, items: (CatalogEntityData & CatalogEntityKindData)[]) => { - this.updateItems(items); + startCatalogEntitySync({ + delete: this.onDeleteEvent, + add: this.onAddEvent, + update: this.onUpdateEvent, }); - // Make sure that we get items ASAP and not the next time one of them changes - ipcRenderer.send(catalogInitChannel); - if (isMainFrame) { ipcRendererOn(catalogEntityRunListener, (event, id: string) => { const entity = this.getById(id); @@ -97,47 +97,51 @@ export class CatalogEntityRegistry { } } - @action updateItems(items: (CatalogEntityData & CatalogEntityKindData)[]) { - this.rawEntities.length = 0; + private onDeleteEvent = action((uid: string) => { + this._entities.delete(uid); + this.rawEntities.delete(uid); + }); - const newIds = new Set(items.map((item) => item.metadata.uid)); + private onAddEvent = (data: RawCatalogEntity) => { + this.addItem(data); + }; - for (const uid of this._entities.keys()) { - if (!newIds.has(uid)) { - this._entities.delete(uid); + private onUpdateEvent = action((uid: string, data: RawCatalogEntityUpdate) => { + const prev = this._entities.get(uid) ?? this.rawEntities.get(uid); + + if (prev) { + if (data.metadata) { + prev.metadata = data.metadata; + } + + if (data.status) { + prev.status = data.status; + } + + if (data.spec) { + prev.spec = data.spec; } } + }); - for (const item of items) { - this.updateItem(item); - } - } + @action + protected addItem(item: (CatalogEntityData & CatalogEntityKindData)) { + const entity = this.categoryRegistry.getEntityForData(item); - @action protected updateItem(item: (CatalogEntityData & CatalogEntityKindData)) { - const existing = this._entities.get(item.metadata.uid); - - if (!existing) { - const entity = this.categoryRegistry.getEntityForData(item); - - if (entity) { - this._entities.set(entity.getId(), entity); - } else { - this.rawEntities.push(item); - } + if (entity) { + this._entities.set(entity.getId(), entity); } else { - existing.metadata = item.metadata; - existing.spec = item.spec; - existing.status = item.status; + this.rawEntities.set(item.metadata.uid, item); } } protected processRawEntities() { - const items = [...this.rawEntities]; + const items = new Map(this.rawEntities); - this.rawEntities.length = 0; + this.rawEntities.clear(); - for (const item of items) { - this.updateItem(item); + for (const item of items.values()) { + this.addItem(item); } } diff --git a/src/renderer/api/catalog-entity-sync.ts b/src/renderer/api/catalog-entity-sync.ts new file mode 100644 index 0000000000..343a3bbb4f --- /dev/null +++ b/src/renderer/api/catalog-entity-sync.ts @@ -0,0 +1,41 @@ +/** + * Copyright (c) OpenLens Authors. All rights reserved. + * Licensed under MIT License. See LICENSE in root directory for more information. + */ + +import { format } from "url"; +import type { CatalogSyncMessage, EntityChangeEvents } from "../../common/catalog/entity-sync"; +import logger from "../../common/logger"; +import { apiPrefix, catalogSyncRoute } from "../../common/vars"; +import { WebSocketApi } from "./websocket-api"; + +export function startCatalogEntitySync(events: EntityChangeEvents): void { + const { hostname, protocol, port } = location; + + const socketUrl = format({ + protocol: protocol.includes("https") ? "wss" : "ws", + hostname, + port, + pathname: `${apiPrefix}${catalogSyncRoute}`, + slashes: true, + }); + + const api = new WebSocketApi(); + + api.on("data", (message) => { + const change = JSON.parse(message) as CatalogSyncMessage; + + logger.silly(`[CATALOG-SYNC]: event`, change); + + switch (change.type) { + case "add": + return events.add(change.data); + case "update": + return events.update(change.uid, change.data); + case "delete": + return events.delete(change.uid); + } + }); + + api.connect(socketUrl); +} diff --git a/src/renderer/api/terminal-api.ts b/src/renderer/api/terminal-api.ts index e05cdb91d5..ac8d45557a 100644 --- a/src/renderer/api/terminal-api.ts +++ b/src/renderer/api/terminal-api.ts @@ -12,6 +12,7 @@ import { ipcRenderer } from "electron"; import logger from "../../common/logger"; import { deserialize, serialize } from "v8"; import { once } from "lodash"; +import { apiPrefix, shellRoute } from "../../common/vars"; export enum TerminalChannels { STDIN = "stdin", @@ -96,7 +97,7 @@ export class TerminalApi extends WebSocketApi { protocol: protocol.includes("https") ? "wss" : "ws", hostname, port, - pathname: "/api", + pathname: `${apiPrefix}${shellRoute}`, query: { ...this.query, shellToken: Buffer.from(authTokenArray).toString("base64"), @@ -126,6 +127,7 @@ export class TerminalApi extends WebSocketApi { super.connect(socketUrl); this.socket.binaryType = "arraybuffer"; + this.on("close", () => this.isReady = false); } sendMessage(message: TerminalMessage) { @@ -176,11 +178,6 @@ export class TerminalApi extends WebSocketApi { super._onOpen(evt); } - protected _onClose(evt: CloseEvent) { - super._onClose(evt); - this.isReady = false; - } - protected emitStatus(data: string, options: { color?: TerminalColor; showTime?: boolean } = {}) { const { color, showTime } = options; const time = showTime ? `${(new Date()).toLocaleString()} ` : ""; diff --git a/src/renderer/api/websocket-api.ts b/src/renderer/api/websocket-api.ts index c6ca434058..329b3c2b9b 100644 --- a/src/renderer/api/websocket-api.ts +++ b/src/renderer/api/websocket-api.ts @@ -80,10 +80,13 @@ export class WebSocketApi extends (EventEmitter pingMessage: "PING", }; - constructor(params: WebsocketApiParams) { + constructor(params: WebsocketApiParams = {}) { super(); makeObservable(this); - this.params = Object.assign({}, WebSocketApi.defaultParams, params); + this.params = { + ...WebSocketApi.defaultParams, + ...params, + }; const { pingInterval } = this.params; if (pingInterval) { @@ -108,7 +111,7 @@ export class WebSocketApi extends (EventEmitter this.socket.addEventListener("open", ev => this._onOpen(ev)); this.socket.addEventListener("message", ev => this._onMessage(ev)); this.socket.addEventListener("error", ev => this._onError(ev)); - this.socket.addEventListener("close", ev => this._onClose(ev)); + this.socket.addEventListener("close", ev => this._onClose(ev, url)); this.readyState = WebSocketApiState.CONNECTING; } @@ -175,15 +178,13 @@ export class WebSocketApi extends (EventEmitter this.writeLog("%cERROR", "color:red;font-weight:bold;", evt); } - protected _onClose(evt: CloseEvent) { + protected _onClose(evt: CloseEvent, url: string) { const error = evt.code !== 1000 || !evt.wasClean; if (error) { const { reconnectDelay } = this.params; if (reconnectDelay) { - const url = this.socket.url; - this.writeLog("will reconnect in", `${reconnectDelay}s`); this.reconnectTimer = setTimeout(() => this.connect(url), reconnectDelay * 1000);