1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

Convert catalog entity sync to websocket

- Send events about individual entities instead of the whole set

- Do basic diffing for updates

Signed-off-by: Sebastian Malton <sebastian@malton.name>
This commit is contained in:
Sebastian Malton 2022-02-11 08:11:49 -05:00
parent 81e6dc5d8e
commit 38211c8b1e
21 changed files with 410 additions and 134 deletions

View File

@ -207,7 +207,7 @@ export interface CatalogEntityMetadata {
description?: string;
source?: string;
labels: Record<string, string>;
[key: string]: string | object;
[key: string]: string | Record<string, string>;
}
export interface CatalogEntityStatus {
@ -220,6 +220,7 @@ export interface CatalogEntityStatus {
enabled?: boolean;
message?: string;
active?: boolean;
[key: string]: string | number | boolean;
}
export interface CatalogEntityActionContext {

View File

@ -0,0 +1,38 @@
/**
* Copyright (c) OpenLens Authors. All rights reserved.
* Licensed under MIT License. See LICENSE in root directory for more information.
*/
import type { RequireAtLeastOne } from "type-fest";
import type { CatalogEntityData } from "./catalog-entity";
export interface EntityChangeEvents {
add: (data: RawCatalogEntity) => void;
update: (uid: string, data: RawCatalogEntityUpdate) => void;
delete: (uid: string) => void;
}
export interface RawCatalogEntity extends CatalogEntityData {
kind: string;
apiVersion: string;
}
export type RawCatalogEntityUpdate = RequireAtLeastOne<CatalogEntityData>;
export interface CatalogSyncAddMessage {
type: "add";
data: RawCatalogEntity;
}
export interface CatalogSyncUpdateMessage {
type: "update",
uid: string;
data: RawCatalogEntityUpdate;
}
export interface CatalogSyncDeleteMessage {
type: "delete",
uid: string;
}
export type CatalogSyncMessage = CatalogSyncAddMessage | CatalogSyncUpdateMessage | CatalogSyncDeleteMessage;

View File

@ -7,13 +7,3 @@
* This is used to activate a specific entity in the renderer main frame
*/
export const catalogEntityRunListener = "catalog-entity:run";
/**
* This is broadcast on whenever there is an update to any catalog item
*/
export const catalogItemsChannel = "catalog:items";
/**
* This can be sent from renderer to main to initialize a broadcast of ITEMS
*/
export const catalogInitChannel = "catalog:init";

View File

@ -10,3 +10,7 @@
export function fromEntries<T, Key extends string>(entries: Iterable<readonly [Key, T]>): { [k in Key]: T } {
return Object.fromEntries(entries) as { [k in Key]: T };
}
export function entries<Key extends string, T>(obj: Record<Key, T>): [Key, T][] {
return Object.entries(obj) as [Key, T][];
}

View File

@ -54,6 +54,8 @@ defineGlobal("__static", {
// Apis
export const apiPrefix = "/api" as string; // local router apis
export const apiKubePrefix = "/api-kube" as string; // k8s cluster apis
export const shellRoute = "/shell" as string;
export const catalogSyncRoute = "/catalog-sync" as string;
// Links
export const issuesTrackerUrl = "https://github.com/lensapp/lens/issues" as string;

View File

@ -1,28 +0,0 @@
/**
* Copyright (c) OpenLens Authors. All rights reserved.
* Licensed under MIT License. See LICENSE in root directory for more information.
*/
import { reaction } from "mobx";
import { broadcastMessage, ipcMainOn } from "../common/ipc";
import type { CatalogEntityRegistry } from "./catalog";
import "../common/catalog-entities/kubernetes-cluster";
import { disposer, toJS } from "../common/utils";
import { debounce } from "lodash";
import type { CatalogEntity } from "../common/catalog";
import { catalogInitChannel, catalogItemsChannel } from "../common/ipc/catalog";
const broadcaster = debounce((items: CatalogEntity[]) => {
broadcastMessage(catalogItemsChannel, items);
}, 1_000, { leading: true, trailing: true });
export function pushCatalogToRenderer(catalog: CatalogEntityRegistry) {
return disposer(
ipcMainOn(catalogInitChannel, () => broadcaster(toJS(catalog.items))),
reaction(() => toJS(catalog.items), (items) => {
broadcaster(items);
}, {
fireImmediately: true,
}),
);
}

View File

@ -26,13 +26,15 @@ export class CatalogEntityRegistry {
this.sources.delete(id);
}
@computed get items(): CatalogEntity[] {
return Array.from(
iter.filter(
iter.flatMap(this.sources.values(), source => source.get()),
entity => this.categoryRegistry.getCategoryForEntity(entity),
),
);
readonly entities = computed(() => Array.from(
iter.filter(
iter.flatMap(this.sources.values(), source => source.get()),
entity => this.categoryRegistry.getCategoryForEntity(entity),
),
));
get items(): CatalogEntity[] {
return this.entities.get();
}
getById<T extends CatalogEntity>(id: string): T | undefined {

View File

@ -0,0 +1,17 @@
/**
* Copyright (c) OpenLens Authors. All rights reserved.
* Licensed under MIT License. See LICENSE in root directory for more information.
*/
import { getInjectable, lifecycleEnum } from "@ogre-tools/injectable";
import catalogEntityRegistryInjectable from "./entity-registry.injectable";
const catalogEntitiesInjectable = getInjectable({
instantiate: (di) => {
const registry = di.inject(catalogEntityRegistryInjectable);
return registry.entities;
},
lifecycle: lifecycleEnum.singleton,
});
export default catalogEntitiesInjectable;

View File

@ -0,0 +1,13 @@
/**
* Copyright (c) OpenLens Authors. All rights reserved.
* Licensed under MIT License. See LICENSE in root directory for more information.
*/
import { getInjectable, lifecycleEnum } from "@ogre-tools/injectable";
import { catalogEntityRegistry } from "./catalog-entity-registry";
const catalogEntityRegistryInjectable = getInjectable({
instantiate: () => catalogEntityRegistry,
lifecycle: lifecycleEnum.singleton,
});
export default catalogEntityRegistryInjectable;

View File

@ -26,8 +26,6 @@ import { disposer, getAppVersion, getAppVersionFromProxyServer } from "../common
import { ipcMainOn } from "../common/ipc";
import { startUpdateChecking } from "./app-updater";
import { IpcRendererNavigationEvents } from "../renderer/navigation/events";
import { pushCatalogToRenderer } from "./catalog-pusher";
import { catalogEntityRegistry } from "./catalog";
import { HelmRepoManager } from "./helm/helm-repo-manager";
import { syncGeneralEntities, syncWeblinks } from "./catalog-sources";
import configurePackages from "../common/configure-packages";
@ -55,6 +53,7 @@ import routerInjectable from "./router/router.injectable";
import shellApiRequestInjectable from "./proxy-functions/shell-api-request/shell-api-request.injectable";
import userStoreInjectable from "../common/user-store/user-store.injectable";
import trayMenuItemsInjectable from "./tray/tray-menu-items.injectable";
import catalogApiRequestHandlerInjectable from "./proxy-functions/catalog-api-request/handler.injectable";
const di = getDi();
@ -63,7 +62,6 @@ app.setName(appName);
di.runSetups().then(() => {
injectSystemCAs();
const onCloseCleanup = disposer();
const onQuitCleanup = disposer();
SentryInit();
@ -163,11 +161,13 @@ di.runSetups().then(() => {
const router = di.inject(routerInjectable);
const shellApiRequest = di.inject(shellApiRequestInjectable);
const catalogApiRequest = di.inject(catalogApiRequestHandlerInjectable);
const lensProxy = LensProxy.createInstance(router, {
getClusterForRequest: (req) => ClusterManager.getInstance().getClusterForRequest(req),
kubeApiRequest,
shellApiRequest,
catalogApiRequest,
});
ClusterManager.createInstance().init();
@ -244,8 +244,6 @@ di.runSetups().then(() => {
}
ipcMainOn(IpcRendererNavigationEvents.LOADED, async () => {
onCloseCleanup.push(pushCatalogToRenderer(catalogEntityRegistry));
const directoryForKubeConfigs = di.inject(directoryForKubeConfigsInjectable);
await ensureDir(directoryForKubeConfigs);
@ -320,8 +318,6 @@ di.runSetups().then(() => {
kubeConfigSyncManager.stopSync();
onCloseCleanup();
// This is set to false here so that LPRM can wait to send future lens://
// requests until after it loads again
lensProtocolRouterMain.rendererLoaded = false;

View File

@ -10,8 +10,6 @@ import type { ClusterId } from "../../../common/cluster-types";
import { ClusterStore } from "../../../common/cluster-store/cluster-store";
import { appEventBus } from "../../../common/app-event-bus/event-bus";
import { broadcastMainChannel, broadcastMessage, ipcMainHandle, ipcMainOn } from "../../../common/ipc";
import { catalogEntityRegistry } from "../../catalog";
import { pushCatalogToRenderer } from "../../catalog-pusher";
import { ClusterManager } from "../../cluster-manager";
import { ResourceApplier } from "../../resource-applier";
import { WindowManager } from "../../window-manager";
@ -43,8 +41,6 @@ export const initIpcMainHandlers = ({ electronMenuItems, directoryForLensLocalSt
if (cluster) {
clusterFrameMap.set(cluster.id, { frameId: event.frameId, processId: event.processId });
cluster.pushState();
pushCatalogToRenderer(catalogEntityRegistry);
}
});

View File

@ -11,18 +11,20 @@ import { apiPrefix, apiKubePrefix } from "../common/vars";
import type { Router } from "./router";
import type { ContextHandler } from "./context-handler/context-handler";
import logger from "./logger";
import { Singleton } from "../common/utils";
import { entries, Singleton } from "../common/utils";
import type { Cluster } from "../common/cluster/cluster";
import type { ProxyApiRequestArgs } from "./proxy-functions";
import type { ClusterProxyApiRequestArgs, KubeApiRequestArgs, ProxyApiRequestArgs } from "./proxy-functions";
import { appEventBus } from "../common/app-event-bus/event-bus";
import { getBoolean } from "./utils/parse-query";
import { matchPath } from "react-router";
type GetClusterForRequest = (req: http.IncomingMessage) => Cluster | null;
export type GetClusterForRequest = (req: http.IncomingMessage) => Cluster | null;
export interface LensProxyFunctions {
getClusterForRequest: GetClusterForRequest,
shellApiRequest: (args: ProxyApiRequestArgs) => void | Promise<void>;
kubeApiRequest: (args: ProxyApiRequestArgs) => void | Promise<void>;
shellApiRequest: (args: ClusterProxyApiRequestArgs) => void;
catalogApiRequest: (args: ProxyApiRequestArgs) => Promise<void>;
kubeApiRequest: (args: KubeApiRequestArgs) => Promise<void>;
}
const watchParam = "watch";
@ -52,6 +54,12 @@ const disallowedPorts = new Set([
10080,
]);
enum ProxyRouteKey {
KUBE = "kube",
SHELL = "shell",
CATALOG = "catalog",
}
export class LensProxy extends Singleton {
protected origin: string;
protected proxyServer: http.Server;
@ -62,7 +70,7 @@ export class LensProxy extends Singleton {
public port: number;
constructor(protected router: Router, { shellApiRequest, kubeApiRequest, getClusterForRequest }: LensProxyFunctions) {
constructor(protected router: Router, { shellApiRequest, kubeApiRequest, getClusterForRequest, catalogApiRequest }: LensProxyFunctions) {
super();
this.getClusterForRequest = getClusterForRequest;
@ -76,19 +84,55 @@ export class LensProxy extends Singleton {
this.handleRequest(req, res);
});
this.proxyServer
.on("upgrade", (req: http.IncomingMessage, socket: net.Socket, head: Buffer) => {
const isInternal = req.url.startsWith(`${apiPrefix}?`);
const routes: Record<ProxyRouteKey, string> = {
[ProxyRouteKey.KUBE]: `${apiKubePrefix}/:rest*`,
[ProxyRouteKey.CATALOG]: `${apiPrefix}/catalog-sync`,
[ProxyRouteKey.SHELL]: `${apiPrefix}/shell`,
};
const handlers: Record<ProxyRouteKey, (req: http.IncomingMessage, socket: net.Socket, head: Buffer, restUrl: string | undefined) => Promise<void>> = {
[ProxyRouteKey.KUBE]: (req, socket, head, restUrl) => {
req.url = `/${restUrl}`;
const cluster = getClusterForRequest(req);
if (!cluster) {
return void logger.error(`[LENS-PROXY]: Could not find cluster for upgrade request from url=${req.url}`);
}
const reqHandler = isInternal ? shellApiRequest : kubeApiRequest;
return kubeApiRequest({ req, socket, head, cluster, restUrl });
},
[ProxyRouteKey.CATALOG]: (req, socket, head) => {
return catalogApiRequest({ req, socket, head });
},
[ProxyRouteKey.SHELL]: (req, socket, head) => {
const cluster = getClusterForRequest(req);
(async () => reqHandler({ req, socket, head, cluster }))()
.catch(error => logger.error("[LENS-PROXY]: failed to handle proxy upgrade", error));
if (!cluster) {
return void logger.error(`[LENS-PROXY]: Could not find cluster for upgrade request from url=${req.url}`);
}
shellApiRequest({ req, socket, head, cluster });
},
};
this.proxyServer
.on("upgrade", async (req: http.IncomingMessage, socket: net.Socket, head: Buffer) => {
try {
for (const [type, matcher] of entries(routes)) {
const match = matchPath<{ rest?: string }>(req.url, {
path: matcher,
exact: true,
});
if (match) {
return await handlers[type](req, socket, head, match.params.rest);
}
}
logger.warn(`[LENS-PROXY]: Tried to upgrade request with no matching handler, url=${req.url}`);
} catch (error) {
logger.error("[LENS-PROXY]: failed to handle proxy upgrade", error);
}
});
}

View File

@ -0,0 +1,147 @@
/**
* Copyright (c) OpenLens Authors. All rights reserved.
* Licensed under MIT License. See LICENSE in root directory for more information.
*/
import { getInjectable, lifecycleEnum } from "@ogre-tools/injectable";
import { isEqual } from "lodash";
import { autorun, IComputedValue } from "mobx";
import type { CatalogEntity } from "../../../common/catalog";
import { toJS } from "../../../renderer/utils";
import catalogEntitiesInjectable from "../../catalog/entities.injectable";
import type { ProxyApiRequestArgs } from "../types";
import WebSocket, { Server as WebSocketServer } from "ws";
import logger from "../../logger";
import EventEmitter from "events";
import type TypedEventEmitter from "typed-emitter";
import type { RawCatalogEntity, RawCatalogEntityUpdate, EntityChangeEvents, CatalogSyncAddMessage, CatalogSyncDeleteMessage, CatalogSyncUpdateMessage } from "../../../common/catalog/entity-sync";
interface Dependencies {
entities: IComputedValue<CatalogEntity[]>;
}
function toRaw(entity: CatalogEntity): RawCatalogEntity {
return {
kind: entity.kind,
apiVersion: entity.apiVersion,
metadata: toJS(entity.metadata),
status: toJS(entity.status),
spec: toJS(entity.spec),
};
}
function createRawEntityUpdate(prevRaw: RawCatalogEntity, rawEntity: RawCatalogEntity): RawCatalogEntityUpdate | false {
const metadata = isEqual(prevRaw.metadata, rawEntity.metadata)
? {}
: { metadata: rawEntity.metadata };
const status = isEqual(prevRaw.status, rawEntity.status)
? {}
: { status: rawEntity.status };
const spec = isEqual(prevRaw.spec, rawEntity.spec)
? {}
: { spec: rawEntity.spec };
const res = {
...metadata,
...status,
...spec,
};
if (!res.metadata && !res.spec && !res.status) {
return false;
}
return res as RawCatalogEntityUpdate;
}
function wrapWebsocketForChangeEvents(websocket: WebSocket): EntityChangeEvents {
return {
add: (data) => {
websocket.send(JSON.stringify({
data,
type: "add",
} as CatalogSyncAddMessage));
},
delete: (uid) => {
websocket.send(JSON.stringify({
uid,
type: "delete",
} as CatalogSyncDeleteMessage));
},
update: (uid, data) => {
websocket.send(JSON.stringify({
uid,
data,
type: "update",
} as CatalogSyncUpdateMessage));
},
};
}
const catalogApiRequestHandler = ({ entities }: Dependencies) => {
const rawEntityMap = new Map<string, RawCatalogEntity>();
const entityChangeEmitter = new EventEmitter() as TypedEventEmitter<EntityChangeEvents>;
autorun(() => {
const currentIds = new Set<string>();
for (const entity of entities.get()) {
currentIds.add(entity.getId());
const rawEntity = toRaw(entity);
if (rawEntityMap.has(rawEntity.metadata.uid)) {
const prevRaw = rawEntityMap.get(rawEntity.metadata.uid);
const diff = createRawEntityUpdate(prevRaw, rawEntity);
if (diff) {
rawEntityMap.set(rawEntity.metadata.uid, rawEntity);
entityChangeEmitter.emit("update", rawEntity.metadata.uid, diff);
}
} else {
rawEntityMap.set(rawEntity.metadata.uid, rawEntity);
entityChangeEmitter.emit("add", rawEntity);
}
}
for (const rawEntityId of rawEntityMap.keys()) {
if (!currentIds.has(rawEntityId)) {
rawEntityMap.delete(rawEntityId);
entityChangeEmitter.emit("delete", rawEntityId);
}
}
});
return async ({ req, socket, head }: ProxyApiRequestArgs): Promise<void> => {
const ws = new WebSocketServer({ noServer: true });
return ws.handleUpgrade(req, socket, head, (websocket) => {
logger.info("[CATALOG-SYNC]: starting new catalog entity sync");
const events = wrapWebsocketForChangeEvents(websocket);
for (const rawEntity of rawEntityMap.values()) {
// initialize with current values
events.add(rawEntity);
}
// Set up passing changes on
entityChangeEmitter.on("add", events.add);
entityChangeEmitter.on("update", events.update);
entityChangeEmitter.on("delete", events.delete);
websocket.on("close", () => {
entityChangeEmitter.off("add", events.add);
entityChangeEmitter.off("update", events.update);
entityChangeEmitter.off("delete", events.delete);
});
});
};
};
const catalogApiRequestHandlerInjectable = getInjectable({
instantiate: (di) => catalogApiRequestHandler({
entities: di.inject(catalogEntitiesInjectable),
}),
lifecycle: lifecycleEnum.singleton,
});
export default catalogApiRequestHandlerInjectable;

View File

@ -5,16 +5,19 @@
import { chunk } from "lodash";
import net from "net";
import url from "url";
import { apiKubePrefix } from "../../common/vars";
import type { ProxyApiRequestArgs } from "./types";
import { parse } from "url";
import type { ClusterProxyApiRequestArgs } from ".";
const skipRawHeaders = new Set(["Host", "Authorization"]);
export async function kubeApiRequest({ req, socket, head, cluster }: ProxyApiRequestArgs) {
const proxyUrl = await cluster.contextHandler.resolveAuthProxyUrl() + req.url.replace(apiKubePrefix, "");
const apiUrl = url.parse(cluster.apiUrl);
const pUrl = url.parse(proxyUrl);
export interface KubeApiRequestArgs extends ClusterProxyApiRequestArgs {
restUrl: string;
}
export async function kubeApiRequest({ req, socket, head, cluster, restUrl }: KubeApiRequestArgs) {
const proxyUrl = `${await cluster.contextHandler.resolveAuthProxyUrl()}/${restUrl}`;
const apiUrl = parse(cluster.apiUrl);
const pUrl = parse(proxyUrl);
const connectOpts = { port: parseInt(pUrl.port), host: pUrl.hostname };
const proxySocket = new net.Socket();

View File

@ -5,11 +5,10 @@
import logger from "../../logger";
import WebSocket, { Server as WebSocketServer } from "ws";
import type { ProxyApiRequestArgs } from "../types";
import { ClusterManager } from "../../cluster-manager";
import URLParse from "url-parse";
import type { ClusterProxyApiRequestArgs } from "../types";
import type { Cluster } from "../../../common/cluster/cluster";
import type { ClusterId } from "../../../common/cluster-types";
import URLParse from "url-parse";
interface Dependencies {
authenticateRequest: (clusterId: ClusterId, tabId: string, shellToken: string) => boolean,
@ -22,9 +21,9 @@ interface Dependencies {
}) => { open: () => Promise<void> };
}
export const shellApiRequest = ({ createShellSession, authenticateRequest }: Dependencies) => ({ req, socket, head }: ProxyApiRequestArgs): void => {
const cluster = ClusterManager.getInstance().getClusterForRequest(req);
const { query: { node: nodeName, shellToken, id: tabId }} = new URLParse(req.url, true);
export const shellApiRequest = ({ createShellSession, authenticateRequest }: Dependencies) => ({ req, socket, head, cluster }: ClusterProxyApiRequestArgs): void => {
const url = new URLParse(req.url, true);
const { query: { node: nodeName, shellToken, id: tabId }} = url;
if (!cluster || !authenticateRequest(cluster.id, tabId, shellToken)) {
socket.write("Invalid shell request");

View File

@ -11,5 +11,8 @@ export interface ProxyApiRequestArgs {
req: http.IncomingMessage,
socket: net.Socket,
head: Buffer,
}
export interface ClusterProxyApiRequestArgs extends ProxyApiRequestArgs {
cluster: Cluster,
}

View File

@ -7,11 +7,17 @@ import { CatalogEntityRegistry } from "../catalog-entity-registry";
import { catalogCategoryRegistry } from "../../../common/catalog/catalog-category-registry";
import { CatalogCategory, CatalogEntityData, CatalogEntityKindData } from "../catalog-entity";
import { KubernetesCluster, WebLink } from "../../../common/catalog-entities";
import { observable } from "mobx";
import { observable, runInAction } from "mobx";
class TestCatalogEntityRegistry extends CatalogEntityRegistry {
replaceItems(items: Array<CatalogEntityData & CatalogEntityKindData>) {
this.updateItems(items);
replaceItems(items: (CatalogEntityData & CatalogEntityKindData)[]) {
runInAction(() => {
this._entities.clear();
for (const item of items) {
this.addItem(item);
}
});
}
}

View File

@ -13,10 +13,11 @@ import { Disposer, iter } from "../utils";
import { once } from "lodash";
import logger from "../../common/logger";
import { CatalogRunEvent } from "../../common/catalog/catalog-run-event";
import { ipcRenderer } from "electron";
import { catalogInitChannel, catalogItemsChannel, catalogEntityRunListener } from "../../common/ipc/catalog";
import { catalogEntityRunListener } from "../../common/ipc/catalog";
import { navigate } from "../navigation";
import { isMainFrame } from "process";
import { startCatalogEntitySync } from "./catalog-entity-sync";
import type { RawCatalogEntity, RawCatalogEntityUpdate } from "../../common/catalog/entity-sync";
export type EntityFilter = (entity: CatalogEntity) => any;
export type CatalogEntityOnBeforeRun = (event: CatalogRunEvent) => void | Promise<void>;
@ -41,7 +42,7 @@ export class CatalogEntityRegistry {
/**
* Buffer for keeping entities that don't yet have CatalogCategory synced
*/
protected rawEntities: (CatalogEntityData & CatalogEntityKindData)[] = [];
protected rawEntities = new Map<string, CatalogEntityData & CatalogEntityKindData>();
constructor(private categoryRegistry: CatalogCategoryRegistry) {
makeObservable(this);
@ -57,7 +58,7 @@ export class CatalogEntityRegistry {
// If the entity was not found but there are rawEntities to be processed,
// try to process them and return the entity.
// This might happen if an extension registered a new Catalog category.
if (this.activeEntityId && !entity && this.rawEntities.length > 0) {
if (this.activeEntityId && !entity && this.rawEntities.size > 0) {
this.processRawEntities();
return this.getActiveEntityById();
@ -79,13 +80,12 @@ export class CatalogEntityRegistry {
}
init() {
ipcRendererOn(catalogItemsChannel, (event, items: (CatalogEntityData & CatalogEntityKindData)[]) => {
this.updateItems(items);
startCatalogEntitySync({
delete: this.onDeleteEvent,
add: this.onAddEvent,
update: this.onUpdateEvent,
});
// Make sure that we get items ASAP and not the next time one of them changes
ipcRenderer.send(catalogInitChannel);
if (isMainFrame) {
ipcRendererOn(catalogEntityRunListener, (event, id: string) => {
const entity = this.getById(id);
@ -97,47 +97,51 @@ export class CatalogEntityRegistry {
}
}
@action updateItems(items: (CatalogEntityData & CatalogEntityKindData)[]) {
this.rawEntities.length = 0;
private onDeleteEvent = action((uid: string) => {
this._entities.delete(uid);
this.rawEntities.delete(uid);
});
const newIds = new Set(items.map((item) => item.metadata.uid));
private onAddEvent = (data: RawCatalogEntity) => {
this.addItem(data);
};
for (const uid of this._entities.keys()) {
if (!newIds.has(uid)) {
this._entities.delete(uid);
private onUpdateEvent = action((uid: string, data: RawCatalogEntityUpdate) => {
const prev = this._entities.get(uid) ?? this.rawEntities.get(uid);
if (prev) {
if (data.metadata) {
prev.metadata = data.metadata;
}
if (data.status) {
prev.status = data.status;
}
if (data.spec) {
prev.spec = data.spec;
}
}
});
for (const item of items) {
this.updateItem(item);
}
}
@action
protected addItem(item: (CatalogEntityData & CatalogEntityKindData)) {
const entity = this.categoryRegistry.getEntityForData(item);
@action protected updateItem(item: (CatalogEntityData & CatalogEntityKindData)) {
const existing = this._entities.get(item.metadata.uid);
if (!existing) {
const entity = this.categoryRegistry.getEntityForData(item);
if (entity) {
this._entities.set(entity.getId(), entity);
} else {
this.rawEntities.push(item);
}
if (entity) {
this._entities.set(entity.getId(), entity);
} else {
existing.metadata = item.metadata;
existing.spec = item.spec;
existing.status = item.status;
this.rawEntities.set(item.metadata.uid, item);
}
}
protected processRawEntities() {
const items = [...this.rawEntities];
const items = new Map(this.rawEntities);
this.rawEntities.length = 0;
this.rawEntities.clear();
for (const item of items) {
this.updateItem(item);
for (const item of items.values()) {
this.addItem(item);
}
}

View File

@ -0,0 +1,41 @@
/**
* Copyright (c) OpenLens Authors. All rights reserved.
* Licensed under MIT License. See LICENSE in root directory for more information.
*/
import { format } from "url";
import type { CatalogSyncMessage, EntityChangeEvents } from "../../common/catalog/entity-sync";
import logger from "../../common/logger";
import { apiPrefix, catalogSyncRoute } from "../../common/vars";
import { WebSocketApi } from "./websocket-api";
export function startCatalogEntitySync(events: EntityChangeEvents): void {
const { hostname, protocol, port } = location;
const socketUrl = format({
protocol: protocol.includes("https") ? "wss" : "ws",
hostname,
port,
pathname: `${apiPrefix}${catalogSyncRoute}`,
slashes: true,
});
const api = new WebSocketApi();
api.on("data", (message) => {
const change = JSON.parse(message) as CatalogSyncMessage;
logger.silly(`[CATALOG-SYNC]: event`, change);
switch (change.type) {
case "add":
return events.add(change.data);
case "update":
return events.update(change.uid, change.data);
case "delete":
return events.delete(change.uid);
}
});
api.connect(socketUrl);
}

View File

@ -12,6 +12,7 @@ import { ipcRenderer } from "electron";
import logger from "../../common/logger";
import { deserialize, serialize } from "v8";
import { once } from "lodash";
import { apiPrefix, shellRoute } from "../../common/vars";
export enum TerminalChannels {
STDIN = "stdin",
@ -96,7 +97,7 @@ export class TerminalApi extends WebSocketApi<TerminalEvents> {
protocol: protocol.includes("https") ? "wss" : "ws",
hostname,
port,
pathname: "/api",
pathname: `${apiPrefix}${shellRoute}`,
query: {
...this.query,
shellToken: Buffer.from(authTokenArray).toString("base64"),
@ -126,6 +127,7 @@ export class TerminalApi extends WebSocketApi<TerminalEvents> {
super.connect(socketUrl);
this.socket.binaryType = "arraybuffer";
this.on("close", () => this.isReady = false);
}
sendMessage(message: TerminalMessage) {
@ -176,11 +178,6 @@ export class TerminalApi extends WebSocketApi<TerminalEvents> {
super._onOpen(evt);
}
protected _onClose(evt: CloseEvent) {
super._onClose(evt);
this.isReady = false;
}
protected emitStatus(data: string, options: { color?: TerminalColor; showTime?: boolean } = {}) {
const { color, showTime } = options;
const time = showTime ? `${(new Date()).toLocaleString()} ` : "";

View File

@ -80,10 +80,13 @@ export class WebSocketApi<Events extends WebSocketEvents> extends (EventEmitter
pingMessage: "PING",
};
constructor(params: WebsocketApiParams) {
constructor(params: WebsocketApiParams = {}) {
super();
makeObservable(this);
this.params = Object.assign({}, WebSocketApi.defaultParams, params);
this.params = {
...WebSocketApi.defaultParams,
...params,
};
const { pingInterval } = this.params;
if (pingInterval) {
@ -108,7 +111,7 @@ export class WebSocketApi<Events extends WebSocketEvents> extends (EventEmitter
this.socket.addEventListener("open", ev => this._onOpen(ev));
this.socket.addEventListener("message", ev => this._onMessage(ev));
this.socket.addEventListener("error", ev => this._onError(ev));
this.socket.addEventListener("close", ev => this._onClose(ev));
this.socket.addEventListener("close", ev => this._onClose(ev, url));
this.readyState = WebSocketApiState.CONNECTING;
}
@ -175,15 +178,13 @@ export class WebSocketApi<Events extends WebSocketEvents> extends (EventEmitter
this.writeLog("%cERROR", "color:red;font-weight:bold;", evt);
}
protected _onClose(evt: CloseEvent) {
protected _onClose(evt: CloseEvent, url: string) {
const error = evt.code !== 1000 || !evt.wasClean;
if (error) {
const { reconnectDelay } = this.params;
if (reconnectDelay) {
const url = this.socket.url;
this.writeLog("will reconnect in", `${reconnectDelay}s`);
this.reconnectTimer = setTimeout(() => this.connect(url), reconnectDelay * 1000);