diff --git a/src/main/shell-session/node-shell-session.ts b/src/main/shell-session/node-shell-session.ts index 4ab1d97745..1f1a6cab03 100644 --- a/src/main/shell-session/node-shell-session.ts +++ b/src/main/shell-session/node-shell-session.ts @@ -29,6 +29,7 @@ import { get } from "lodash"; import { Node, NodesApi } from "../../common/k8s-api/endpoints"; import { KubeJsonApi } from "../../common/k8s-api/kube-json-api"; import logger from "../logger"; +import { TerminalChannels } from "../../renderer/api/terminal-api"; export class NodeShellSession extends ShellSession { ShellType = "node-shell"; @@ -51,7 +52,10 @@ export class NodeShellSession extends ShellSession { await this.waitForRunningPod(); } catch (error) { this.deleteNodeShellPod(); - this.sendResponse(`Error occurred: ${get(error, "response.body.message", error?.toString() || "unknown error")}`); + this.send({ + type: TerminalChannels.STDOUT, + data: `Error occurred: ${get(error, "response.body.message", error?.toString() || "unknown error")}`, + }); throw new ShellOpenError("failed to create node pod", error); } diff --git a/src/main/shell-session/shell-session.ts b/src/main/shell-session/shell-session.ts index bfd9a1601e..0fb9b3ef11 100644 --- a/src/main/shell-session/shell-session.ts +++ b/src/main/shell-session/shell-session.ts @@ -32,6 +32,8 @@ import { UserStore } from "../../common/user-store"; import * as pty from "node-pty"; import { appEventBus } from "../../common/event-bus"; import logger from "../logger"; +import { TerminalChannels, TerminalMessage } from "../../renderer/api/terminal-api"; +import { deserialize, serialize } from "v8"; export class ShellOpenError extends Error { constructor(message: string, public cause: Error) { @@ -145,18 +147,24 @@ export abstract class ShellSession { protected abstract get cwd(): string | undefined; - protected ensureShellProcess(shell: string, args: string[], env: Record, cwd: string): pty.IPty { - if (!ShellSession.processes.has(this.terminalId)) { + protected ensureShellProcess(shell: string, args: string[], env: Record, cwd: string): { shellProcess: pty.IPty, resume: boolean } { + const resume = ShellSession.processes.has(this.terminalId); + + if (!resume) { ShellSession.processes.set(this.terminalId, pty.spawn(shell, args, { + rows: 30, cols: 80, cwd, env, name: "xterm-256color", - rows: 30, })); } - return ShellSession.processes.get(this.terminalId); + const shellProcess = ShellSession.processes.get(this.terminalId); + + logger.info(`[SHELL-SESSION]: PTY for ${this.terminalId} is ${resume ? "resumed" : "started"} with PID=${shellProcess.pid}`); + + return { shellProcess, resume }; } constructor(protected websocket: WebSocket, protected cluster: Cluster, terminalId: string) { @@ -166,56 +174,88 @@ export abstract class ShellSession { this.terminalId = `${cluster.id}:${terminalId}`; } + protected send(message: TerminalMessage): void { + this.websocket.send(serialize(message)); + } + protected async openShellProcess(shell: string, args: string[], env: Record) { const cwd = (this.cwd && await fse.pathExists(this.cwd)) ? this.cwd : env.HOME; - const shellProcess = this.ensureShellProcess(shell, args, env, cwd); + const { shellProcess, resume } = this.ensureShellProcess(shell, args, env, cwd); + + if (resume) { + this.send({ type: TerminalChannels.CONNECTED }); + } this.running = true; - shellProcess.onData(data => this.sendResponse(data)); + shellProcess.onData(data => this.send({ type: TerminalChannels.STDOUT, data })); shellProcess.onExit(({ exitCode }) => { - this.running = false; + logger.info(`[SHELL-SESSION]: shell has exited for ${this.terminalId} closed with exitcode=${exitCode}`); - if (exitCode > 0) { - this.sendResponse("Terminal will auto-close in 15 seconds ..."); - setTimeout(() => this.exit(), 15 * 1000); - } else { - this.exit(); + // This might already be false because of the kill() within the websocket.on("close") handler + if (this.running) { + this.running = false; + + if (exitCode > 0) { + this.send({ type: TerminalChannels.STDOUT, data: "Terminal will auto-close in 15 seconds ..." }); + setTimeout(() => this.exit(), 15 * 1000); + } else { + this.exit(); + } } }); this.websocket - .on("message", (data: string) => { + .on("message", (data: string | Uint8Array) => { if (!this.running) { - return; + return void logger.debug(`[SHELL-SESSION]: received message from ${this.terminalId}, but shellProcess isn't running`); } - const message = Buffer.from(data.slice(1, data.length), "base64").toString(); + if (typeof data === "string") { + return void logger.silly(`[SHELL-SESSION]: Received message from ${this.terminalId}`, { data }); + } - switch (data[0]) { - case "0": - shellProcess.write(message); - break; - case "4": - const { Width, Height } = JSON.parse(message); + try { + const message: TerminalMessage = deserialize(data); - shellProcess.resize(Width, Height); - break; + switch (message.type) { + case TerminalChannels.STDIN: + shellProcess.write(message.data); + break; + case TerminalChannels.RESIZE: + shellProcess.resize(message.data.width, message.data.height); + break; + default: + logger.warn(`[SHELL-SESSION]: unknown or unhandleable message type for ${this.terminalId}`, message); + break; + } + } catch (error) { + logger.error(`[SHELL-SESSION]: failed to handle message for ${this.terminalId}`, error); } }) - .on("close", (code) => { - logger.debug(`[SHELL-SESSION]: websocket for ${this.terminalId} closed with code=${code}`); + .on("close", code => { + logger.info(`[SHELL-SESSION]: websocket for ${this.terminalId} closed with code=${WebSocketCloseEvent[code]}(${code})`, { cluster: this.cluster.getMeta() }); - if (this.running && code !== WebSocketCloseEvent.AbnormalClosure) { - // This code is the one that gets sent when the network is turned off - try { - logger.info(`[SHELL-SESSION]: Killing shell process for ${this.terminalId}`); - process.kill(shellProcess.pid); - ShellSession.processes.delete(this.terminalId); - } catch (e) { - } + const stopShellSession = this.running + && ( + ( + code !== WebSocketCloseEvent.AbnormalClosure + && code !== WebSocketCloseEvent.GoingAway + ) + || this.cluster.disconnected + ); + + if (stopShellSession) { this.running = false; + + try { + logger.info(`[SHELL-SESSION]: Killing shell process (pid=${shellProcess.pid}) for ${this.terminalId}`); + shellProcess.kill(); + ShellSession.processes.delete(this.terminalId); + } catch (error) { + logger.warn(`[SHELL-SESSION]: failed to kill shell process (pid=${shellProcess.pid}) for ${this.terminalId}`, error); + } } }); @@ -300,8 +340,4 @@ export abstract class ShellSession { this.websocket.close(code); } } - - protected sendResponse(msg: string) { - this.websocket.send(`1${Buffer.from(msg).toString("base64")}`); - } } diff --git a/src/renderer/api/terminal-api.ts b/src/renderer/api/terminal-api.ts index 779d729934..8cca03538b 100644 --- a/src/renderer/api/terminal-api.ts +++ b/src/renderer/api/terminal-api.ts @@ -19,22 +19,39 @@ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -import { boundMethod, base64, EventEmitter, getHostedClusterId } from "../utils"; -import { WebSocketApi } from "./websocket-api"; +import { getHostedClusterId } from "../utils"; +import { WebSocketApi, WebSocketEvents } from "./websocket-api"; import isEqual from "lodash/isEqual"; -import { isDevelopment } from "../../common/vars"; import url from "url"; import { makeObservable, observable } from "mobx"; import { ipcRenderer } from "electron"; +import logger from "../../common/logger"; +import { deserialize, serialize } from "v8"; +import { once } from "lodash"; export enum TerminalChannels { - STDIN = 0, - STDOUT = 1, - STDERR = 2, - TERMINAL_SIZE = 4, - TOKEN = 9, + STDIN = "stdin", + STDOUT = "stdout", + CONNECTED = "connected", + RESIZE = "resize", } +export type TerminalMessage = { + type: TerminalChannels.STDIN, + data: string, +} | { + type: TerminalChannels.STDOUT, + data: string, +} | { + type: TerminalChannels.CONNECTED +} | { + type: TerminalChannels.RESIZE, + data: { + width: number, + height: number, + }, +}; + enum TerminalColor { RED = "\u001b[31m", GREEN = "\u001b[32m", @@ -53,17 +70,20 @@ export type TerminalApiQuery = Record & { type?: string; }; -export class TerminalApi extends WebSocketApi { - protected size: { Width: number; Height: number }; +export interface TerminalEvents extends WebSocketEvents { + ready: () => void; + connected: () => void; +} + +export class TerminalApi extends WebSocketApi { + protected size: { width: number; height: number }; - public onReady = new EventEmitter<[]>(); @observable public isReady = false; constructor(protected query: TerminalApiQuery) { super({ - logging: isDevelopment, flushOnOpen: false, - pingIntervalSeconds: 30, + pingInterval: 30, }); makeObservable(this); @@ -100,56 +120,81 @@ export class TerminalApi extends WebSocketApi { slashes: true, }); - this.onData.addListener(this._onReady, { prepend: true }); + const onReady = once((data?: string) => { + this.isReady = true; + this.emit("ready"); + this.removeListener("data", onReady); + this.removeListener("connected", onReady); + this.flush(); + + // data is undefined if the event that was handled is "connected" + if (data === undefined) { + /** + * Output the last line, the makes sure that the terminal isn't completely + * empty when the user refreshes. + */ + this.emit("data", window.localStorage.getItem(`${this.query.id}:last-data`)); + } + }); + + this.prependListener("data", onReady); + this.prependListener("connected", onReady); + super.connect(socketUrl); + this.socket.binaryType = "arraybuffer"; } destroy() { if (!this.socket) return; - const exitCode = String.fromCharCode(4); // ctrl+d + const controlCode = String.fromCharCode(4); // ctrl+d - this.sendCommand(exitCode); + this.sendMessage({ type: TerminalChannels.STDIN, data: controlCode }); setTimeout(() => super.destroy(), 2000); } - removeAllListeners() { - super.removeAllListeners(); - this.onReady.removeAllListeners(); - } - - @boundMethod - protected _onReady(data: string) { - if (!data) return true; - this.isReady = true; - this.onReady.emit(); - this.onData.removeListener(this._onReady); - this.flush(); - this.onData.emit(data); // re-emit data - - return false; // prevent calling rest of listeners - } - reconnect() { super.reconnect(); } - sendCommand(key: string, channel = TerminalChannels.STDIN) { - return this.send(channel + base64.encode(key)); + sendMessage(message: TerminalMessage) { + return this.send(serialize(message)); } sendTerminalSize(cols: number, rows: number) { - const newSize = { Width: cols, Height: rows }; + const newSize = { width: cols, height: rows }; if (!isEqual(this.size, newSize)) { - this.sendCommand(JSON.stringify(newSize), TerminalChannels.TERMINAL_SIZE); + this.sendMessage({ + type: TerminalChannels.RESIZE, + data: newSize, + }); this.size = newSize; } } - protected parseMessage(data: string) { - data = data.substr(1); // skip channel + protected _onMessage({ data, ...evt }: MessageEvent): void { + try { + const message: TerminalMessage = deserialize(new Uint8Array(data)); - return base64.decode(data); + switch (message.type) { + case TerminalChannels.STDOUT: + /** + * save the last data for reconnections. User localStorage because we + * don't want this data to survive if the app is closed + */ + window.localStorage.setItem(`${this.query.id}:last-data`, message.data); + super._onMessage({ data: message.data, ...evt }); + break; + case TerminalChannels.CONNECTED: + this.emit("connected"); + break; + default: + logger.warn(`[TERMINAL-API]: unknown or unhandleable message type`, message); + break; + } + } catch (error) { + logger.error(`[TERMINAL-API]: failed to handle message`, error); + } } protected _onOpen(evt: Event) { @@ -166,16 +211,13 @@ export class TerminalApi extends WebSocketApi { protected emitStatus(data: string, options: { color?: TerminalColor; showTime?: boolean } = {}) { const { color, showTime } = options; + const time = showTime ? `${(new Date()).toLocaleString()} ` : ""; if (color) { data = `${color}${data}${TerminalColor.NO_COLOR}`; } - let time; - if (showTime) { - time = `${(new Date()).toLocaleString()} `; - } - this.onData.emit(`${showTime ? time : ""}${data}\r\n`); + this.emit("data", `${time}${data}\r\n`); } protected emitError(error: string) { diff --git a/src/renderer/api/websocket-api.ts b/src/renderer/api/websocket-api.ts index fbe5142360..7b9144d325 100644 --- a/src/renderer/api/websocket-api.ts +++ b/src/renderer/api/websocket-api.ts @@ -20,20 +20,48 @@ */ import { observable, makeObservable } from "mobx"; -import { EventEmitter } from "../../common/event-emitter"; +import EventEmitter from "events"; +import type TypedEventEmitter from "typed-emitter"; +import type { Arguments } from "typed-emitter"; +import { isDevelopment } from "../../common/vars"; -interface IParams { - url?: string; // connection url, starts with ws:// or wss:// - autoConnect?: boolean; // auto-connect in constructor - flushOnOpen?: boolean; // flush pending commands on open socket - reconnectDelaySeconds?: number; // reconnect timeout in case of error (0 - don't reconnect) - pingIntervalSeconds?: number; // send ping message for keeping connection alive in some env, e.g. AWS (0 - don't ping) - logging?: boolean; // show logs in console -} +interface WebsocketApiParams { + /** + * Flush pending commands on open socket + * + * @default true + */ + flushOnOpen?: boolean; -interface IMessage { - id: string; - data: string; + /** + * In case of an error, wait this many seconds before reconnecting. + * + * If falsy, don't reconnect + * + * @default 10 + */ + reconnectDelay?: number; + + /** + * The message for pinging the websocket + * + * @default "PING" + */ + pingMessage?: string | ArrayBufferLike | Blob | ArrayBufferView; + + /** + * If set to a number > 0, then the API will ping the socket on that interval. + * + * @unit seconds + */ + pingInterval?: number; + + /** + * Whether to show logs in the console + * + * @default isDevelopment + */ + logging?: boolean; } export enum WebSocketApiState { @@ -44,79 +72,74 @@ export enum WebSocketApiState { CLOSED = "closed", } -export class WebSocketApi { +export interface WebSocketEvents { + open: () => void, + data: (message: string) => void; + close: () => void; +} + +type Defaulted = Required> & Omit; + +export class WebSocketApi extends (EventEmitter as { new(): TypedEventEmitter }) { protected socket: WebSocket; - protected pendingCommands: IMessage[] = []; - protected reconnectTimer: any; - protected pingTimer: any; - protected pingMessage = "PING"; + protected pendingCommands: (string | ArrayBufferLike | Blob | ArrayBufferView)[] = []; + protected reconnectTimer?: any; + protected pingTimer?: any; + protected params: Defaulted; @observable readyState = WebSocketApiState.PENDING; - public onOpen = new EventEmitter<[]>(); - public onData = new EventEmitter<[string]>(); - public onClose = new EventEmitter<[]>(); - - static defaultParams: Partial = { - autoConnect: true, - logging: false, - reconnectDelaySeconds: 10, - pingIntervalSeconds: 0, + private static defaultParams = { + logging: isDevelopment, + reconnectDelay: 10, flushOnOpen: true, + pingMessage: "PING", }; - constructor(protected params: IParams) { + constructor(params: WebsocketApiParams) { + super(); makeObservable(this); this.params = Object.assign({}, WebSocketApi.defaultParams, params); - const { autoConnect, pingIntervalSeconds } = this.params; + const { pingInterval } = this.params; - if (autoConnect) { - setTimeout(() => this.connect()); - } - - if (pingIntervalSeconds) { - this.pingTimer = setInterval(() => this.ping(), pingIntervalSeconds * 1000); + if (pingInterval) { + this.pingTimer = setInterval(() => this.ping(), pingInterval * 1000); } } get isConnected() { - const state = this.socket ? this.socket.readyState : -1; - - return state === WebSocket.OPEN && this.isOnline; + return this.socket?.readyState === WebSocket.OPEN && this.isOnline; } get isOnline() { return navigator.onLine; } - setParams(params: Partial) { - Object.assign(this.params, params); - } + connect(url: string) { + // close previous connection first + this.socket?.close(); - connect(url = this.params.url) { - if (this.socket) { - this.socket.close(); // close previous connection first - } + // start new connection this.socket = new WebSocket(url); - this.socket.onopen = this._onOpen.bind(this); - this.socket.onmessage = this._onMessage.bind(this); - this.socket.onerror = this._onError.bind(this); - this.socket.onclose = this._onClose.bind(this); + this.socket.addEventListener("open", ev => this._onOpen(ev)); + this.socket.addEventListener("message", ev => this._onMessage(ev)); + this.socket.addEventListener("error", ev => this._onError(ev)); + this.socket.addEventListener("close", ev => this._onClose(ev)); this.readyState = WebSocketApiState.CONNECTING; } ping() { - if (!this.isConnected) return; - this.send(this.pingMessage); + if (this.isConnected) { + this.send(this.params.pingMessage); + } } - reconnect() { - const { reconnectDelaySeconds } = this.params; + reconnect(): void { + if (!this.socket) { + return void console.error("[WEBSOCKET-API]: cannot reconnect to a socket that is not connected"); + } - if (!reconnectDelaySeconds) return; - this.writeLog("reconnect after", `${reconnectDelaySeconds}ms`); - this.reconnectTimer = setTimeout(() => this.connect(), reconnectDelaySeconds * 1000); - this.readyState = WebSocketApiState.RECONNECTING; + this.connect(this.socket.url); } destroy() { @@ -124,52 +147,43 @@ export class WebSocketApi { this.socket.close(); this.socket = null; this.pendingCommands = []; - this.removeAllListeners(); + this.clearAllListeners(); clearTimeout(this.reconnectTimer); clearInterval(this.pingTimer); this.readyState = WebSocketApiState.PENDING; } - removeAllListeners() { - this.onOpen.removeAllListeners(); - this.onData.removeAllListeners(); - this.onClose.removeAllListeners(); + clearAllListeners() { + for (const name of this.eventNames()) { + this.removeAllListeners(name as keyof Events); + } } - send(command: string) { - const msg: IMessage = { - id: (Math.random() * Date.now()).toString(16).replace(".", ""), - data: command, - }; - + send(command: string | ArrayBufferLike | Blob | ArrayBufferView) { if (this.isConnected) { - this.socket.send(msg.data); - } - else { - this.pendingCommands.push(msg); + this.socket.send(command); + } else { + this.pendingCommands.push(command); } } protected flush() { - this.pendingCommands.forEach(msg => this.send(msg.data)); + for (const command of this.pendingCommands) { + this.send(command); + } + this.pendingCommands.length = 0; } - protected parseMessage(data: string) { - return data; - } - protected _onOpen(evt: Event) { - this.onOpen.emit(); + this.emit("open", ...[] as Arguments); if (this.params.flushOnOpen) this.flush(); this.readyState = WebSocketApiState.OPEN; this.writeLog("%cOPEN", "color:green;font-weight:bold;", evt); } - protected _onMessage(evt: MessageEvent) { - const data = this.parseMessage(evt.data); - - this.onData.emit(data); + protected _onMessage({ data }: MessageEvent): void { + this.emit("data", ...[data] as Arguments); this.writeLog("%cMESSAGE", "color:black;font-weight:bold;", data); } @@ -181,18 +195,26 @@ export class WebSocketApi { const error = evt.code !== 1000 || !evt.wasClean; if (error) { - this.reconnect(); - } - else { + const { reconnectDelay } = this.params; + + if (reconnectDelay) { + const url = this.socket.url; + + this.writeLog("will reconnect in", `${reconnectDelay}s`); + + this.reconnectTimer = setTimeout(() => this.connect(url), reconnectDelay * 1000); + this.readyState = WebSocketApiState.RECONNECTING; + } + } else { this.readyState = WebSocketApiState.CLOSED; - this.onClose.emit(); + this.emit("close", ...[] as Arguments); } this.writeLog("%cCLOSE", `color:${error ? "red" : "black"};font-weight:bold;`, evt); } protected writeLog(...data: any[]) { if (this.params.logging) { - console.log(...data); + console.debug(...data); } } } diff --git a/src/renderer/components/dock/terminal.store.ts b/src/renderer/components/dock/terminal.store.ts index 0f4ec7e8ba..bc6bda61e8 100644 --- a/src/renderer/components/dock/terminal.store.ts +++ b/src/renderer/components/dock/terminal.store.ts @@ -22,7 +22,7 @@ import { autorun, observable, when } from "mobx"; import { autoBind, noop, Singleton } from "../../utils"; import { Terminal } from "./terminal"; -import { TerminalApi } from "../../api/terminal-api"; +import { TerminalApi, TerminalChannels } from "../../api/terminal-api"; import { dockStore, DockTab, DockTabCreateSpecific, TabId, TabKind } from "./dock.store"; import { WebSocketApiState } from "../../api/websocket-api"; import { Notifications } from "../notifications"; @@ -78,6 +78,8 @@ export class TerminalStore extends Singleton { this.connections.set(tabId, api); this.terminals.set(tabId, terminal); + + api.connect(); } disconnect(tabId: TabId) { @@ -135,7 +137,14 @@ export class TerminalStore extends Singleton { const terminalApi = this.connections.get(dockStore.selectedTabId); if (terminalApi) { - terminalApi.sendCommand(command + (enter ? "\r" : "")); + if (enter) { + command += "\r"; + } + + terminalApi.sendMessage({ + type: TerminalChannels.STDIN, + data: command, + }); } else { console.warn("The selected tab is does not have a connection. Cannot send command.", { tabId: dockStore.selectedTabId, command }); } diff --git a/src/renderer/components/dock/terminal.ts b/src/renderer/components/dock/terminal.ts index b9f5855203..07dcde55ae 100644 --- a/src/renderer/components/dock/terminal.ts +++ b/src/renderer/components/dock/terminal.ts @@ -24,11 +24,11 @@ import { reaction } from "mobx"; import { Terminal as XTerm } from "xterm"; import { FitAddon } from "xterm-addon-fit"; import { dockStore, TabId } from "./dock.store"; -import type { TerminalApi } from "../../api/terminal-api"; +import { TerminalApi, TerminalChannels } from "../../api/terminal-api"; import { ThemeStore } from "../../theme.store"; import { boundMethod, disposer } from "../../utils"; import { isMac } from "../../../common/vars"; -import { camelCase } from "lodash"; +import { camelCase, once } from "lodash"; import { UserStore } from "../../../common/user-store"; import { clipboard } from "electron"; import logger from "../../../common/logger"; @@ -119,11 +119,13 @@ export class Terminal { // bind events const onDataHandler = this.xterm.onData(this.onData); + const clearOnce = once(this.onClear); this.viewport.addEventListener("scroll", this.onScroll); this.elem.addEventListener("contextmenu", this.onContextMenu); - this.api.onReady.addListener(this.onClear, { once: true }); // clear status logs (connecting..) - this.api.onData.addListener(this.onApiData); + this.api.once("ready", clearOnce); + this.api.once("connected", clearOnce); + this.api.on("data", this.onApiData); window.addEventListener("resize", this.onResize); this.disposer.push( @@ -176,7 +178,10 @@ export class Terminal { onData = (data: string) => { if (!this.api.isReady) return; - this.api.sendCommand(data); + this.api.sendMessage({ + type: TerminalChannels.STDIN, + data, + }); }; onScroll = () => {