1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

Fix handling of PTYs when renderer is reloaded (#4346)

Co-authored-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>
This commit is contained in:
Sebastian Malton 2021-11-17 13:48:46 -05:00 committed by GitHub
parent e146dff9da
commit 0f2801552f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 294 additions and 176 deletions

View File

@ -29,6 +29,7 @@ import { get } from "lodash";
import { Node, NodesApi } from "../../common/k8s-api/endpoints";
import { KubeJsonApi } from "../../common/k8s-api/kube-json-api";
import logger from "../logger";
import { TerminalChannels } from "../../renderer/api/terminal-api";
export class NodeShellSession extends ShellSession {
ShellType = "node-shell";
@ -51,7 +52,10 @@ export class NodeShellSession extends ShellSession {
await this.waitForRunningPod();
} catch (error) {
this.deleteNodeShellPod();
this.sendResponse(`Error occurred: ${get(error, "response.body.message", error?.toString() || "unknown error")}`);
this.send({
type: TerminalChannels.STDOUT,
data: `Error occurred: ${get(error, "response.body.message", error?.toString() || "unknown error")}`,
});
throw new ShellOpenError("failed to create node pod", error);
}

View File

@ -32,6 +32,8 @@ import { UserStore } from "../../common/user-store";
import * as pty from "node-pty";
import { appEventBus } from "../../common/event-bus";
import logger from "../logger";
import { TerminalChannels, TerminalMessage } from "../../renderer/api/terminal-api";
import { deserialize, serialize } from "v8";
export class ShellOpenError extends Error {
constructor(message: string, public cause: Error) {
@ -145,18 +147,24 @@ export abstract class ShellSession {
protected abstract get cwd(): string | undefined;
protected ensureShellProcess(shell: string, args: string[], env: Record<string, string>, cwd: string): pty.IPty {
if (!ShellSession.processes.has(this.terminalId)) {
protected ensureShellProcess(shell: string, args: string[], env: Record<string, string>, cwd: string): { shellProcess: pty.IPty, resume: boolean } {
const resume = ShellSession.processes.has(this.terminalId);
if (!resume) {
ShellSession.processes.set(this.terminalId, pty.spawn(shell, args, {
rows: 30,
cols: 80,
cwd,
env,
name: "xterm-256color",
rows: 30,
}));
}
return ShellSession.processes.get(this.terminalId);
const shellProcess = ShellSession.processes.get(this.terminalId);
logger.info(`[SHELL-SESSION]: PTY for ${this.terminalId} is ${resume ? "resumed" : "started"} with PID=${shellProcess.pid}`);
return { shellProcess, resume };
}
constructor(protected websocket: WebSocket, protected cluster: Cluster, terminalId: string) {
@ -166,56 +174,88 @@ export abstract class ShellSession {
this.terminalId = `${cluster.id}:${terminalId}`;
}
protected send(message: TerminalMessage): void {
this.websocket.send(serialize(message));
}
protected async openShellProcess(shell: string, args: string[], env: Record<string, any>) {
const cwd = (this.cwd && await fse.pathExists(this.cwd))
? this.cwd
: env.HOME;
const shellProcess = this.ensureShellProcess(shell, args, env, cwd);
const { shellProcess, resume } = this.ensureShellProcess(shell, args, env, cwd);
if (resume) {
this.send({ type: TerminalChannels.CONNECTED });
}
this.running = true;
shellProcess.onData(data => this.sendResponse(data));
shellProcess.onData(data => this.send({ type: TerminalChannels.STDOUT, data }));
shellProcess.onExit(({ exitCode }) => {
logger.info(`[SHELL-SESSION]: shell has exited for ${this.terminalId} closed with exitcode=${exitCode}`);
// This might already be false because of the kill() within the websocket.on("close") handler
if (this.running) {
this.running = false;
if (exitCode > 0) {
this.sendResponse("Terminal will auto-close in 15 seconds ...");
this.send({ type: TerminalChannels.STDOUT, data: "Terminal will auto-close in 15 seconds ..." });
setTimeout(() => this.exit(), 15 * 1000);
} else {
this.exit();
}
}
});
this.websocket
.on("message", (data: string) => {
.on("message", (data: string | Uint8Array) => {
if (!this.running) {
return;
return void logger.debug(`[SHELL-SESSION]: received message from ${this.terminalId}, but shellProcess isn't running`);
}
const message = Buffer.from(data.slice(1, data.length), "base64").toString();
if (typeof data === "string") {
return void logger.silly(`[SHELL-SESSION]: Received message from ${this.terminalId}`, { data });
}
switch (data[0]) {
case "0":
shellProcess.write(message);
break;
case "4":
const { Width, Height } = JSON.parse(message);
try {
const message: TerminalMessage = deserialize(data);
shellProcess.resize(Width, Height);
switch (message.type) {
case TerminalChannels.STDIN:
shellProcess.write(message.data);
break;
case TerminalChannels.RESIZE:
shellProcess.resize(message.data.width, message.data.height);
break;
default:
logger.warn(`[SHELL-SESSION]: unknown or unhandleable message type for ${this.terminalId}`, message);
break;
}
} catch (error) {
logger.error(`[SHELL-SESSION]: failed to handle message for ${this.terminalId}`, error);
}
})
.on("close", (code) => {
logger.debug(`[SHELL-SESSION]: websocket for ${this.terminalId} closed with code=${code}`);
.on("close", code => {
logger.info(`[SHELL-SESSION]: websocket for ${this.terminalId} closed with code=${WebSocketCloseEvent[code]}(${code})`, { cluster: this.cluster.getMeta() });
if (this.running && code !== WebSocketCloseEvent.AbnormalClosure) {
// This code is the one that gets sent when the network is turned off
try {
logger.info(`[SHELL-SESSION]: Killing shell process for ${this.terminalId}`);
process.kill(shellProcess.pid);
ShellSession.processes.delete(this.terminalId);
} catch (e) {
}
const stopShellSession = this.running
&& (
(
code !== WebSocketCloseEvent.AbnormalClosure
&& code !== WebSocketCloseEvent.GoingAway
)
|| this.cluster.disconnected
);
if (stopShellSession) {
this.running = false;
try {
logger.info(`[SHELL-SESSION]: Killing shell process (pid=${shellProcess.pid}) for ${this.terminalId}`);
shellProcess.kill();
ShellSession.processes.delete(this.terminalId);
} catch (error) {
logger.warn(`[SHELL-SESSION]: failed to kill shell process (pid=${shellProcess.pid}) for ${this.terminalId}`, error);
}
}
});
@ -300,8 +340,4 @@ export abstract class ShellSession {
this.websocket.close(code);
}
}
protected sendResponse(msg: string) {
this.websocket.send(`1${Buffer.from(msg).toString("base64")}`);
}
}

View File

@ -19,22 +19,39 @@
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
import { boundMethod, base64, EventEmitter, getHostedClusterId } from "../utils";
import { WebSocketApi } from "./websocket-api";
import { getHostedClusterId } from "../utils";
import { WebSocketApi, WebSocketEvents } from "./websocket-api";
import isEqual from "lodash/isEqual";
import { isDevelopment } from "../../common/vars";
import url from "url";
import { makeObservable, observable } from "mobx";
import { ipcRenderer } from "electron";
import logger from "../../common/logger";
import { deserialize, serialize } from "v8";
import { once } from "lodash";
export enum TerminalChannels {
STDIN = 0,
STDOUT = 1,
STDERR = 2,
TERMINAL_SIZE = 4,
TOKEN = 9,
STDIN = "stdin",
STDOUT = "stdout",
CONNECTED = "connected",
RESIZE = "resize",
}
export type TerminalMessage = {
type: TerminalChannels.STDIN,
data: string,
} | {
type: TerminalChannels.STDOUT,
data: string,
} | {
type: TerminalChannels.CONNECTED
} | {
type: TerminalChannels.RESIZE,
data: {
width: number,
height: number,
},
};
enum TerminalColor {
RED = "\u001b[31m",
GREEN = "\u001b[32m",
@ -53,17 +70,20 @@ export type TerminalApiQuery = Record<string, string> & {
type?: string;
};
export class TerminalApi extends WebSocketApi {
protected size: { Width: number; Height: number };
export interface TerminalEvents extends WebSocketEvents {
ready: () => void;
connected: () => void;
}
export class TerminalApi extends WebSocketApi<TerminalEvents> {
protected size: { width: number; height: number };
public onReady = new EventEmitter<[]>();
@observable public isReady = false;
constructor(protected query: TerminalApiQuery) {
super({
logging: isDevelopment,
flushOnOpen: false,
pingIntervalSeconds: 30,
pingInterval: 30,
});
makeObservable(this);
@ -100,56 +120,81 @@ export class TerminalApi extends WebSocketApi {
slashes: true,
});
this.onData.addListener(this._onReady, { prepend: true });
const onReady = once((data?: string) => {
this.isReady = true;
this.emit("ready");
this.removeListener("data", onReady);
this.removeListener("connected", onReady);
this.flush();
// data is undefined if the event that was handled is "connected"
if (data === undefined) {
/**
* Output the last line, the makes sure that the terminal isn't completely
* empty when the user refreshes.
*/
this.emit("data", window.localStorage.getItem(`${this.query.id}:last-data`));
}
});
this.prependListener("data", onReady);
this.prependListener("connected", onReady);
super.connect(socketUrl);
this.socket.binaryType = "arraybuffer";
}
destroy() {
if (!this.socket) return;
const exitCode = String.fromCharCode(4); // ctrl+d
const controlCode = String.fromCharCode(4); // ctrl+d
this.sendCommand(exitCode);
this.sendMessage({ type: TerminalChannels.STDIN, data: controlCode });
setTimeout(() => super.destroy(), 2000);
}
removeAllListeners() {
super.removeAllListeners();
this.onReady.removeAllListeners();
}
@boundMethod
protected _onReady(data: string) {
if (!data) return true;
this.isReady = true;
this.onReady.emit();
this.onData.removeListener(this._onReady);
this.flush();
this.onData.emit(data); // re-emit data
return false; // prevent calling rest of listeners
}
reconnect() {
super.reconnect();
}
sendCommand(key: string, channel = TerminalChannels.STDIN) {
return this.send(channel + base64.encode(key));
sendMessage(message: TerminalMessage) {
return this.send(serialize(message));
}
sendTerminalSize(cols: number, rows: number) {
const newSize = { Width: cols, Height: rows };
const newSize = { width: cols, height: rows };
if (!isEqual(this.size, newSize)) {
this.sendCommand(JSON.stringify(newSize), TerminalChannels.TERMINAL_SIZE);
this.sendMessage({
type: TerminalChannels.RESIZE,
data: newSize,
});
this.size = newSize;
}
}
protected parseMessage(data: string) {
data = data.substr(1); // skip channel
protected _onMessage({ data, ...evt }: MessageEvent<ArrayBuffer>): void {
try {
const message: TerminalMessage = deserialize(new Uint8Array(data));
return base64.decode(data);
switch (message.type) {
case TerminalChannels.STDOUT:
/**
* save the last data for reconnections. User localStorage because we
* don't want this data to survive if the app is closed
*/
window.localStorage.setItem(`${this.query.id}:last-data`, message.data);
super._onMessage({ data: message.data, ...evt });
break;
case TerminalChannels.CONNECTED:
this.emit("connected");
break;
default:
logger.warn(`[TERMINAL-API]: unknown or unhandleable message type`, message);
break;
}
} catch (error) {
logger.error(`[TERMINAL-API]: failed to handle message`, error);
}
}
protected _onOpen(evt: Event) {
@ -166,16 +211,13 @@ export class TerminalApi extends WebSocketApi {
protected emitStatus(data: string, options: { color?: TerminalColor; showTime?: boolean } = {}) {
const { color, showTime } = options;
const time = showTime ? `${(new Date()).toLocaleString()} ` : "";
if (color) {
data = `${color}${data}${TerminalColor.NO_COLOR}`;
}
let time;
if (showTime) {
time = `${(new Date()).toLocaleString()} `;
}
this.onData.emit(`${showTime ? time : ""}${data}\r\n`);
this.emit("data", `${time}${data}\r\n`);
}
protected emitError(error: string) {

View File

@ -20,20 +20,48 @@
*/
import { observable, makeObservable } from "mobx";
import { EventEmitter } from "../../common/event-emitter";
import EventEmitter from "events";
import type TypedEventEmitter from "typed-emitter";
import type { Arguments } from "typed-emitter";
import { isDevelopment } from "../../common/vars";
interface IParams {
url?: string; // connection url, starts with ws:// or wss://
autoConnect?: boolean; // auto-connect in constructor
flushOnOpen?: boolean; // flush pending commands on open socket
reconnectDelaySeconds?: number; // reconnect timeout in case of error (0 - don't reconnect)
pingIntervalSeconds?: number; // send ping message for keeping connection alive in some env, e.g. AWS (0 - don't ping)
logging?: boolean; // show logs in console
}
interface WebsocketApiParams {
/**
* Flush pending commands on open socket
*
* @default true
*/
flushOnOpen?: boolean;
interface IMessage {
id: string;
data: string;
/**
* In case of an error, wait this many seconds before reconnecting.
*
* If falsy, don't reconnect
*
* @default 10
*/
reconnectDelay?: number;
/**
* The message for pinging the websocket
*
* @default "PING"
*/
pingMessage?: string | ArrayBufferLike | Blob | ArrayBufferView;
/**
* If set to a number > 0, then the API will ping the socket on that interval.
*
* @unit seconds
*/
pingInterval?: number;
/**
* Whether to show logs in the console
*
* @default isDevelopment
*/
logging?: boolean;
}
export enum WebSocketApiState {
@ -44,79 +72,74 @@ export enum WebSocketApiState {
CLOSED = "closed",
}
export class WebSocketApi {
export interface WebSocketEvents {
open: () => void,
data: (message: string) => void;
close: () => void;
}
type Defaulted<Params, DefaultParams extends keyof Params> = Required<Pick<Params, DefaultParams>> & Omit<Params, DefaultParams>;
export class WebSocketApi<Events extends WebSocketEvents> extends (EventEmitter as { new<T>(): TypedEventEmitter<T> })<Events> {
protected socket: WebSocket;
protected pendingCommands: IMessage[] = [];
protected reconnectTimer: any;
protected pingTimer: any;
protected pingMessage = "PING";
protected pendingCommands: (string | ArrayBufferLike | Blob | ArrayBufferView)[] = [];
protected reconnectTimer?: any;
protected pingTimer?: any;
protected params: Defaulted<WebsocketApiParams, keyof typeof WebSocketApi["defaultParams"]>;
@observable readyState = WebSocketApiState.PENDING;
public onOpen = new EventEmitter<[]>();
public onData = new EventEmitter<[string]>();
public onClose = new EventEmitter<[]>();
static defaultParams: Partial<IParams> = {
autoConnect: true,
logging: false,
reconnectDelaySeconds: 10,
pingIntervalSeconds: 0,
private static defaultParams = {
logging: isDevelopment,
reconnectDelay: 10,
flushOnOpen: true,
pingMessage: "PING",
};
constructor(protected params: IParams) {
constructor(params: WebsocketApiParams) {
super();
makeObservable(this);
this.params = Object.assign({}, WebSocketApi.defaultParams, params);
const { autoConnect, pingIntervalSeconds } = this.params;
const { pingInterval } = this.params;
if (autoConnect) {
setTimeout(() => this.connect());
}
if (pingIntervalSeconds) {
this.pingTimer = setInterval(() => this.ping(), pingIntervalSeconds * 1000);
if (pingInterval) {
this.pingTimer = setInterval(() => this.ping(), pingInterval * 1000);
}
}
get isConnected() {
const state = this.socket ? this.socket.readyState : -1;
return state === WebSocket.OPEN && this.isOnline;
return this.socket?.readyState === WebSocket.OPEN && this.isOnline;
}
get isOnline() {
return navigator.onLine;
}
setParams(params: Partial<IParams>) {
Object.assign(this.params, params);
}
connect(url: string) {
// close previous connection first
this.socket?.close();
connect(url = this.params.url) {
if (this.socket) {
this.socket.close(); // close previous connection first
}
// start new connection
this.socket = new WebSocket(url);
this.socket.onopen = this._onOpen.bind(this);
this.socket.onmessage = this._onMessage.bind(this);
this.socket.onerror = this._onError.bind(this);
this.socket.onclose = this._onClose.bind(this);
this.socket.addEventListener("open", ev => this._onOpen(ev));
this.socket.addEventListener("message", ev => this._onMessage(ev));
this.socket.addEventListener("error", ev => this._onError(ev));
this.socket.addEventListener("close", ev => this._onClose(ev));
this.readyState = WebSocketApiState.CONNECTING;
}
ping() {
if (!this.isConnected) return;
this.send(this.pingMessage);
if (this.isConnected) {
this.send(this.params.pingMessage);
}
}
reconnect() {
const { reconnectDelaySeconds } = this.params;
reconnect(): void {
if (!this.socket) {
return void console.error("[WEBSOCKET-API]: cannot reconnect to a socket that is not connected");
}
if (!reconnectDelaySeconds) return;
this.writeLog("reconnect after", `${reconnectDelaySeconds}ms`);
this.reconnectTimer = setTimeout(() => this.connect(), reconnectDelaySeconds * 1000);
this.readyState = WebSocketApiState.RECONNECTING;
this.connect(this.socket.url);
}
destroy() {
@ -124,52 +147,43 @@ export class WebSocketApi {
this.socket.close();
this.socket = null;
this.pendingCommands = [];
this.removeAllListeners();
this.clearAllListeners();
clearTimeout(this.reconnectTimer);
clearInterval(this.pingTimer);
this.readyState = WebSocketApiState.PENDING;
}
removeAllListeners() {
this.onOpen.removeAllListeners();
this.onData.removeAllListeners();
this.onClose.removeAllListeners();
clearAllListeners() {
for (const name of this.eventNames()) {
this.removeAllListeners(name as keyof Events);
}
}
send(command: string) {
const msg: IMessage = {
id: (Math.random() * Date.now()).toString(16).replace(".", ""),
data: command,
};
send(command: string | ArrayBufferLike | Blob | ArrayBufferView) {
if (this.isConnected) {
this.socket.send(msg.data);
}
else {
this.pendingCommands.push(msg);
this.socket.send(command);
} else {
this.pendingCommands.push(command);
}
}
protected flush() {
this.pendingCommands.forEach(msg => this.send(msg.data));
for (const command of this.pendingCommands) {
this.send(command);
}
this.pendingCommands.length = 0;
}
protected parseMessage(data: string) {
return data;
}
protected _onOpen(evt: Event) {
this.onOpen.emit();
this.emit("open", ...[] as Arguments<Events["open"]>);
if (this.params.flushOnOpen) this.flush();
this.readyState = WebSocketApiState.OPEN;
this.writeLog("%cOPEN", "color:green;font-weight:bold;", evt);
}
protected _onMessage(evt: MessageEvent) {
const data = this.parseMessage(evt.data);
this.onData.emit(data);
protected _onMessage({ data }: MessageEvent): void {
this.emit("data", ...[data] as Arguments<Events["data"]>);
this.writeLog("%cMESSAGE", "color:black;font-weight:bold;", data);
}
@ -181,18 +195,26 @@ export class WebSocketApi {
const error = evt.code !== 1000 || !evt.wasClean;
if (error) {
this.reconnect();
const { reconnectDelay } = this.params;
if (reconnectDelay) {
const url = this.socket.url;
this.writeLog("will reconnect in", `${reconnectDelay}s`);
this.reconnectTimer = setTimeout(() => this.connect(url), reconnectDelay * 1000);
this.readyState = WebSocketApiState.RECONNECTING;
}
else {
} else {
this.readyState = WebSocketApiState.CLOSED;
this.onClose.emit();
this.emit("close", ...[] as Arguments<Events["close"]>);
}
this.writeLog("%cCLOSE", `color:${error ? "red" : "black"};font-weight:bold;`, evt);
}
protected writeLog(...data: any[]) {
if (this.params.logging) {
console.log(...data);
console.debug(...data);
}
}
}

View File

@ -22,7 +22,7 @@
import { autorun, observable, when } from "mobx";
import { autoBind, noop, Singleton } from "../../utils";
import { Terminal } from "./terminal";
import { TerminalApi } from "../../api/terminal-api";
import { TerminalApi, TerminalChannels } from "../../api/terminal-api";
import { dockStore, DockTab, DockTabCreateSpecific, TabId, TabKind } from "./dock.store";
import { WebSocketApiState } from "../../api/websocket-api";
import { Notifications } from "../notifications";
@ -78,6 +78,8 @@ export class TerminalStore extends Singleton {
this.connections.set(tabId, api);
this.terminals.set(tabId, terminal);
api.connect();
}
disconnect(tabId: TabId) {
@ -135,7 +137,14 @@ export class TerminalStore extends Singleton {
const terminalApi = this.connections.get(dockStore.selectedTabId);
if (terminalApi) {
terminalApi.sendCommand(command + (enter ? "\r" : ""));
if (enter) {
command += "\r";
}
terminalApi.sendMessage({
type: TerminalChannels.STDIN,
data: command,
});
} else {
console.warn("The selected tab is does not have a connection. Cannot send command.", { tabId: dockStore.selectedTabId, command });
}

View File

@ -24,11 +24,11 @@ import { reaction } from "mobx";
import { Terminal as XTerm } from "xterm";
import { FitAddon } from "xterm-addon-fit";
import { dockStore, TabId } from "./dock.store";
import type { TerminalApi } from "../../api/terminal-api";
import { TerminalApi, TerminalChannels } from "../../api/terminal-api";
import { ThemeStore } from "../../theme.store";
import { boundMethod, disposer } from "../../utils";
import { isMac } from "../../../common/vars";
import { camelCase } from "lodash";
import { camelCase, once } from "lodash";
import { UserStore } from "../../../common/user-store";
import { clipboard } from "electron";
import logger from "../../../common/logger";
@ -119,11 +119,13 @@ export class Terminal {
// bind events
const onDataHandler = this.xterm.onData(this.onData);
const clearOnce = once(this.onClear);
this.viewport.addEventListener("scroll", this.onScroll);
this.elem.addEventListener("contextmenu", this.onContextMenu);
this.api.onReady.addListener(this.onClear, { once: true }); // clear status logs (connecting..)
this.api.onData.addListener(this.onApiData);
this.api.once("ready", clearOnce);
this.api.once("connected", clearOnce);
this.api.on("data", this.onApiData);
window.addEventListener("resize", this.onResize);
this.disposer.push(
@ -176,7 +178,10 @@ export class Terminal {
onData = (data: string) => {
if (!this.api.isReady) return;
this.api.sendCommand(data);
this.api.sendMessage({
type: TerminalChannels.STDIN,
data,
});
};
onScroll = () => {