import net from "net"; import http from "http"; import spdy from "spdy"; import httpProxy from "http-proxy"; import url from "url"; import * as WebSocket from "ws"; import { apiPrefix, apiKubePrefix } from "../common/vars"; import { openShell } from "./node-shell-session"; import { Router } from "./router"; import { ClusterManager } from "./cluster-manager"; import { ContextHandler } from "./context-handler"; import logger from "./logger"; export class LensProxy { protected origin: string protected proxyServer: http.Server protected router: Router protected closed = false protected retryCounters = new Map() static create(port: number, clusterManager: ClusterManager) { return new LensProxy(port, clusterManager).listen(); } private constructor(protected port: number, protected clusterManager: ClusterManager) { this.origin = `http://localhost:${port}`; this.router = new Router(); } listen(port = this.port): this { this.proxyServer = this.buildCustomProxy().listen(port); logger.info(`LensProxy server has started at ${this.origin}`); return this; } close() { logger.info("Closing proxy server"); this.proxyServer.close(); this.closed = true; } protected buildCustomProxy(): http.Server { const proxy = this.createProxy(); const spdyProxy = spdy.createServer({ spdy: { plain: true, protocols: ["http/1.1", "spdy/3.1"] } }, (req: http.IncomingMessage, res: http.ServerResponse) => { this.handleRequest(proxy, req, res); }); spdyProxy.on("upgrade", (req: http.IncomingMessage, socket: net.Socket, head: Buffer) => { if (req.url.startsWith(`${apiPrefix}?`)) { this.handleWsUpgrade(req, socket, head); } else { this.handleProxyUpgrade(proxy, req, socket, head); } }); spdyProxy.on("error", (err) => { logger.error("proxy error", err); }); return spdyProxy; } protected async handleProxyUpgrade(proxy: httpProxy, req: http.IncomingMessage, socket: net.Socket, head: Buffer) { const cluster = this.clusterManager.getClusterForRequest(req); if (cluster) { const proxyUrl = await cluster.contextHandler.resolveAuthProxyUrl() + req.url.replace(apiKubePrefix, ""); const apiUrl = url.parse(cluster.apiUrl); const pUrl = url.parse(proxyUrl); const connectOpts = { port: parseInt(pUrl.port), host: pUrl.hostname }; const proxySocket = new net.Socket(); proxySocket.connect(connectOpts, () => { proxySocket.write(`${req.method} ${pUrl.path} HTTP/1.1\r\n`); proxySocket.write(`Host: ${apiUrl.host}\r\n`); for (let i = 0; i < req.rawHeaders.length; i += 2) { const key = req.rawHeaders[i]; if (key !== "Host" && key !== "Authorization") { proxySocket.write(`${req.rawHeaders[i]}: ${req.rawHeaders[i+1]}\r\n`); } } proxySocket.write("\r\n"); proxySocket.write(head); }); proxySocket.setKeepAlive(true); socket.setKeepAlive(true); proxySocket.setTimeout(0); socket.setTimeout(0); proxySocket.on('data', function (chunk) { socket.write(chunk); }); proxySocket.on('end', function () { socket.end(); }); proxySocket.on('error', function (err) { socket.write("HTTP/" + req.httpVersion + " 500 Connection error\r\n\r\n"); socket.end(); }); socket.on('data', function (chunk) { proxySocket.write(chunk); }); socket.on('end', function () { proxySocket.end(); }); socket.on('error', function () { proxySocket.end(); }); } } protected createProxy(): httpProxy { const proxy = httpProxy.createProxyServer(); proxy.on("error", (error, req, res, target) => { if (this.closed) { return; } if (target) { logger.debug("Failed proxy to target: " + JSON.stringify(target, null, 2)); if (req.method === "GET" && (!res.statusCode || res.statusCode >= 500)) { const reqId = this.getRequestId(req); const retryCount = this.retryCounters.get(reqId) || 0; const timeoutMs = retryCount * 250; if (retryCount < 20) { logger.debug(`Retrying proxy request to url: ${reqId}`); setTimeout(() => { this.retryCounters.set(reqId, retryCount + 1); this.handleRequest(proxy, req, res); }, timeoutMs); } } } try { res.writeHead(500).end("Oops, something went wrong."); } catch (e) { logger.error(`[LENS-PROXY]: Failed to write headers: `, e); } }); return proxy; } protected createWsListener(): WebSocket.Server { const ws = new WebSocket.Server({ noServer: true }); return ws.on("connection", ((socket: WebSocket, req: http.IncomingMessage) => { const cluster = this.clusterManager.getClusterForRequest(req); const nodeParam = url.parse(req.url, true).query["node"]?.toString(); openShell(socket, cluster, nodeParam); })); } protected async getProxyTarget(req: http.IncomingMessage, contextHandler: ContextHandler): Promise { if (req.url.startsWith(apiKubePrefix)) { delete req.headers.authorization; req.url = req.url.replace(apiKubePrefix, ""); const isWatchRequest = req.url.includes("watch="); return await contextHandler.getApiTarget(isWatchRequest); } } protected getRequestId(req: http.IncomingMessage) { return req.headers.host + req.url; } protected async handleRequest(proxy: httpProxy, req: http.IncomingMessage, res: http.ServerResponse) { const cluster = this.clusterManager.getClusterForRequest(req); if (cluster) { const proxyTarget = await this.getProxyTarget(req, cluster.contextHandler); if (proxyTarget) { // allow to fetch apis in "clusterId.localhost:port" from "localhost:port" res.setHeader("Access-Control-Allow-Origin", this.origin); return proxy.web(req, res, proxyTarget); } } this.router.route(cluster, req, res); } protected async handleWsUpgrade(req: http.IncomingMessage, socket: net.Socket, head: Buffer) { const wsServer = this.createWsListener(); wsServer.handleUpgrade(req, socket, head, (con) => { wsServer.emit("connection", con, req); }); } }