1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

refactor watch api to main & eliminate lens-server process

Signed-off-by: Jari Kolehmainen <jari.kolehmainen@gmail.com>
This commit is contained in:
Jari Kolehmainen 2020-03-23 17:06:45 +02:00
parent c0fb808365
commit 420f4306bf
24 changed files with 133 additions and 815 deletions

View File

@ -1,23 +0,0 @@
// Get certificate auth data
import * as fs from "fs";
import * as util from "util";
import config from "../config";
let caData: string = null
export async function getCertificateAuthorityData(encoding = 'utf8'): Promise<string> {
if (caData) {
return caData
}
if (!fs.existsSync(config.KUBERNETES_CA_CERT)) {
caData = config.KUBERNETES_CA_CERT
return caData
}
try {
const ca = await util.promisify(fs.readFile)(config.KUBERNETES_CA_CERT);
return Buffer.from(ca).toString(encoding);
} catch (error) {
return ''
}
}

View File

@ -1,20 +0,0 @@
// Get cluster info
import { kubeRequest } from "./kube-request";
import { IClusterInfo } from "../common/cluster"
export async function getClusterInfo(): Promise<IClusterInfo> {
const [kubeVersion] = await Promise.all([
getKubeVersion().catch(() => null),
]);
return {
kubeVersion,
};
}
export async function getKubeVersion() {
const res = await kubeRequest<{ gitVersion: string }>({
path: "/version",
});
return res.gitVersion.slice(1);
}

View File

@ -1,22 +0,0 @@
// Get service-account token
import { existsSync, readFile } from "fs";
import { promisify } from "util";
import config from "../config"
const tokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token";
export async function getServiceAccountToken() {
const { SERVICE_ACCOUNT_TOKEN } = config;
if (SERVICE_ACCOUNT_TOKEN) {
return SERVICE_ACCOUNT_TOKEN;
}
if (existsSync(tokenPath)) {
const token = await promisify(readFile)(tokenPath);
return token.toString().trim();
}
return null;
}

View File

@ -1,19 +0,0 @@
// Check cluster-admin rights for auth-token
// CLI: kubectl auth can-i '*' '*' --all-namespaces
import { reviewResourceAccess } from "./review-resource-access";
import { IKubeRequestParams } from "./kube-request";
export async function isClusterAdmin(params: Partial<IKubeRequestParams>): Promise<boolean> {
try {
const accessCheck = await reviewResourceAccess(params, {
resource: "*",
namespace: "*",
group: "*",
verb: "*",
});
return accessCheck.allowed;
} catch (err) {
return false;
}
}

View File

@ -1,58 +0,0 @@
// Kubernetes request api helper
import config, { isSecure } from "../config";
import axios, { AxiosError, AxiosRequestConfig } from "axios"
import * as https from "https";
import { getCertificateAuthorityData } from "./get-cert-auth-data";
import { logger, sanitizeHeaders } from "../utils/logger";
import { getServiceAccountToken } from "./get-service-account-token";
export interface IKubeRequestParams extends AxiosRequestConfig {
path: string;
authHeader?: string;
}
export async function kubeRequest<T>(params: IKubeRequestParams): Promise<T> {
const { KUBE_CLUSTER_URL, KUBERNETES_CLIENT_CERT, KUBERNETES_CLIENT_KEY } = config;
const serviceToken = await getServiceAccountToken();
const defaultAuthHeader = serviceToken ? `Bearer ${serviceToken}` : "";
const {
authHeader = defaultAuthHeader,
url = KUBE_CLUSTER_URL,
path = "",
...reqConfig
} = params;
// add access token
reqConfig.headers = Object.assign({}, reqConfig.headers, {
"Content-type": "application/json",
});
if (!KUBERNETES_CLIENT_CERT && authHeader) {
reqConfig.headers["Authorization"] = authHeader;
}
// allow requests to kube-cluster without valid ssl certs..
reqConfig.httpsAgent = new https.Agent({
rejectUnauthorized: isSecure(),
cert: KUBERNETES_CLIENT_CERT,
key: KUBERNETES_CLIENT_KEY,
ca: await getCertificateAuthorityData(),
});
const reqUrl = url + path;
return axios(reqUrl, reqConfig)
.then(res => res.data)
.catch((error: AxiosError<T>) => {
const { message, config } = error;
logger.error(`[KUBE-REQUEST]: ${message}`, {
code: error.code,
method: config.method,
url: config.url,
headers: sanitizeHeaders(config.headers),
params: config.params,
});
throw error;
});
}

View File

@ -1,47 +0,0 @@
// Get resource access review
// Docs: https://kubernetes.io/docs/reference/access-authn-authz/authorization/
import { IKubeRequestParams, kubeRequest } from "./kube-request";
interface IResourceAccess {
apiVersion: string;
kind: string;
status: IResourceAccessStatus;
}
export interface IResourceAccessStatus {
allowed: boolean;
denied?: boolean;
reason?: string;
evaluationError?: string;
}
interface IResourceAccessAttributes {
group?: string | "*";
resource?: string | "*";
verb?: "get" | "list" | "create" | "update" | "patch" | "watch" | "proxy" | "redirect" | "delete" | "deletecollection" | "*";
namespace?: string | "*";
}
export async function reviewResourceAccess(
params: Partial<IKubeRequestParams> = {},
attrs: IResourceAccessAttributes
): Promise<IResourceAccessStatus> {
try {
const accessReview = await kubeRequest<IResourceAccess>({
...params,
method: "POST",
path: "/apis/authorization.k8s.io/v1/selfsubjectaccessreviews",
data: {
spec: {
resourceAttributes: attrs
}
}
});
return accessReview.status;
} catch (err) {
return {
allowed: false,
reason: err.toString(),
}
}
}

View File

@ -1,39 +0,0 @@
// Check validity of auth-token
import { kubeRequest } from "./kube-request";
export interface ITokenReview {
apiVersion: string;
kind: string;
status: ITokenReviewStatus;
}
export interface ITokenReviewStatus {
authenticated: boolean;
user: {
username?: string;
uid?: string;
groups?: string[];
};
error?: string[];
}
export async function reviewToken(authToken: string): Promise<ITokenReviewStatus> {
try {
const tokenReview = await kubeRequest<ITokenReview>({
path: "/apis/authentication.k8s.io/v1/tokenreviews",
method: "POST",
data: {
spec: {
token: authToken
}
}
});
return tokenReview.status;
} catch (err) {
return {
authenticated: false,
user: {},
error: [err.toString()],
}
}
}

View File

@ -1,73 +0,0 @@
import config, { BUILD_DIR, CLIENT_DIR } from "../server/config"
import path from "path"
import fs from "fs"
import express from "express"
import cookieSession from "cookie-session"
import compression from "compression"
import helmet from "helmet"
import morgan from "morgan"
import { logger } from "../server/utils/logger"
import { kubewatchRoute, readyStateRoute } from "../server/routes";
import { useRequestHeaderToken } from "../server/middlewares";
const {
IS_PRODUCTION, LOCAL_SERVER_PORT, API_PREFIX,
SESSION_NAME, SESSION_SECRET,
} = config;
const app = express();
const localApis = express.Router();
const outputDir = path.resolve(process.cwd(), BUILD_DIR, CLIENT_DIR);
app.set('trust proxy', 1); // trust first proxy
localApis.use(
readyStateRoute(),
kubewatchRoute(),
);
// https://github.com/expressjs/cookie-session
app.use(cookieSession({
name: SESSION_NAME,
secret: SESSION_SECRET,
secure: IS_PRODUCTION,
httpOnly: true,
maxAge: 365 * 24 * 60 * 60 * 1000, // 1 year
}));
// protect from well-known web vulnerabilities by setting HTTP headers appropriately
// https://github.com/helmetjs/helmet
app.use(helmet({
hsts: {
includeSubDomains: false,
}
}));
// use auth-token from request headers (if applicable via proxy)
app.use(useRequestHeaderToken());
// requests logging
app.use(morgan('tiny'));
// enable gzip compression
app.use(compression());
app.use(express.urlencoded({ extended: true })); // for parsing application/x-www-form-urlencoded
app.use("/", express.static(outputDir)); // handle static files (assets)
app.use(API_PREFIX.BASE, express.json({ limit: "10mb" }), localApis);
// handle all page requests via index.html, in development mode it's managed by webpack-dev-server
app.all('*', (req, res) => {
const indexHtml = path.resolve(outputDir, 'index.html');
if (fs.existsSync(indexHtml)) res.sendFile(indexHtml);
else {
res.send("Error: build/index.html doesn't exists");
}
});
// run server
const server = app.listen(LOCAL_SERVER_PORT, "127.0.0.1", () => {
logger.appStarted(LOCAL_SERVER_PORT, 'Server started');
});

View File

@ -1,3 +0,0 @@
export * from "./kube-proxy"
export * from "./terminal-proxy"
export * from "./use-header-token"

View File

@ -1,24 +0,0 @@
import { Request } from "express";
import proxy from "http-proxy-middleware"
import { userSession } from "../user-session";
import config, { isSecure } from "../config";
export function kubeProxy(serviceUrl: string, proxyConfig: proxy.Config = {}) {
const { IS_PRODUCTION } = config;
return proxy({
target: serviceUrl,
secure: isSecure(), // verify the ssl certs
logLevel: IS_PRODUCTION ? "info" : "debug",
changeOrigin: true, // needed for virtual hosted sites
pathRewrite: (path, req: Request) => {
return path.replace(req.baseUrl, ""); // remove client-prefix, e.g "/api-kube"
},
onProxyReq(proxyReq, req: Request, res) {
const { authHeader } = userSession.get(req);
if (authHeader) {
proxyReq.setHeader("Authorization", authHeader);
}
},
...proxyConfig,
})
}

View File

@ -1,19 +0,0 @@
import { NextFunction } from "express";
import proxy from "http-proxy-middleware"
import appConfig from "../config"
const { KUBE_TERMINAL_URL, API_PREFIX, IS_PRODUCTION } = appConfig;
interface ITerminalProxy extends NextFunction {
upgrade: () => void;
}
export const terminalProxy = proxy({
target: KUBE_TERMINAL_URL,
ws: true,
changeOrigin: true,
logLevel: IS_PRODUCTION ? "info" : "debug",
pathRewrite: {
["^" + API_PREFIX.TERMINAL]: "" // remove api-prefix
}
}) as ITerminalProxy;

View File

@ -1,20 +0,0 @@
// Allow to use "Authorization" from request for auto-login (when provided by proxy)
import { NextFunction, Request, Response } from "express"
import { userSession } from "../user-session";
export function useRequestHeaderToken() {
return (req: Request, res: Response, next: NextFunction) => {
const authorization = req.headers["authorization"] || req.headers["x-lens-kubectl-token"];
const { authHeader, isUserLogin } = userSession.get(req);
const userHasOwnToken = authHeader && isUserLogin;
// don't overwrite user's login credentials
if (authorization && !userHasOwnToken && authHeader !== authorization) {
userSession.save(req, {
authHeader: authorization.toString(),
});
}
next();
}
}

View File

@ -1,3 +0,0 @@
export * from "./kubewatch-route"
export * from "./metrics-route"
export * from "./ready-state-route"

View File

@ -1,139 +0,0 @@
//-- Streaming k8s watch-api events
import axios from "axios"
import { Router } from "express";
import { IncomingMessage } from "http";
import { kubeRequest } from "../api/kube-request";
import { IKubeWatchEvent, IKubeWatchRouteEvent, IKubeWatchRouteQuery} from "../common/kubewatch"
import { userSession } from "../user-session";
import { logger } from "../utils/logger";
export function kubewatchRoute() {
const router = Router();
router.route('/watch')
.get(async (req, res) => {
const { authHeader } = userSession.get(req);
const queryParams: IKubeWatchRouteQuery = req.query;
const apis: string[] = [].concat(queryParams.api || []);
const streams = new Map<string, IncomingMessage>();
const eventsBuffer = new Map<string, IKubeWatchEvent>();
let isClosing = false;
if (!apis.length) {
res.status(400).json({
message: "Empty request. Query params 'api' are not provided.",
example: "?api=/api/v1/pods&api=/api/v1/nodes",
});
return;
}
res.header({
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
});
// init streams
const cancelToken = axios.CancelToken.source();
apis.forEach(apiUrl => {
console.log("[KUBE-WATCH] init stream", apiUrl);
const connecting = kubeRequest<IncomingMessage>({
path: apiUrl,
responseType: "stream",
authHeader: authHeader,
cancelToken: cancelToken.token,
});
connecting.then(stream => {
streams.set(apiUrl, stream); // save connection for clean up
stream.socket.setKeepAlive(true); // keep connection alive
let lastUnusedBuffer = ""
return stream
.on("data", (buffer: Buffer) => {
const data = lastUnusedBuffer + buffer.toString().trim();
data.split("\n").map(str => {
try {
const eventObj = JSON.parse(str);
bufferEvent(eventObj); // handle
lastUnusedBuffer = ""; // clean up since parsing was successful
} catch (err) {
lastUnusedBuffer = str; // invalid json, tail must wait next incoming data
}
});
})
.on("end", () => {
// client must update resource-version and try to reconnect
console.log(`[KUBE-WATCH] stream ended ${apiUrl}`)
sendEvent({
type: "STREAM_END",
url: apiUrl,
status: stream.statusCode,
})
});
}, err => {
logger.error(`[KUBE-WATCH] error ${apiUrl}`, err);
sendEvent({
type: "STREAM_END",
url: apiUrl,
status: 410,
})
});
});
function getEventBufferId(evt: IKubeWatchEvent) {
const { object, type } = evt;
const { kind } = object;
let { metadata: { uid } } = object;
if (kind === "Event") {
uid = (object as any).involvedObject.uid; // reason: uid for events always unique
}
return `${type}:${kind}-${uid}`
}
function bufferEvent(evt: IKubeWatchEvent) {
const id = getEventBufferId(evt);
if (eventsBuffer.has(id)) {
eventsBuffer.delete(id); // clear to move event to the end in map's "timeline"
}
eventsBuffer.set(id, evt); // save latest event by object's identity
}
function sendEvent(evt: IKubeWatchEvent | IKubeWatchRouteEvent, autoFlush = true) {
if (isClosing) return;
// convert to "text/event-stream" format
res.write(`data: ${JSON.stringify(evt)}\n\n`);
if (autoFlush) {
// @ts-ignore
res.flush();
}
}
// process sending events
const flushInterval = setInterval(() => {
const eventsPack = Array.from(eventsBuffer.entries())
.slice(0, 100) // max limit per sending
.map(([id, evt]) => {
eventsBuffer.delete(id); // clean up used event
return evt;
});
if (eventsPack.length > 0) {
eventsPack.forEach(evt => sendEvent(evt, false));
// @ts-ignore
res.flush();
}
}, 1000);
function onClose() {
if (isClosing) return;
isClosing = true;
clearInterval(flushInterval);
streams.forEach(stream => stream.removeAllListeners("end"));
cancelToken.cancel();
}
req.on("close", onClose);
res.on("finish", onClose);
});
return router;
}

View File

@ -1,16 +0,0 @@
//-- App readiness checker
import { Router } from "express";
export function readyStateRoute() {
const router = Router();
router.route('/ready')
.get(async (req, res) => {
const serviceWaitingList: string[] = [];
res.json(serviceWaitingList);
});
return router;
}

View File

@ -1,24 +0,0 @@
//-- User sessions helper
import { Request } from "express";
import CookieSessionObject = CookieSessionInterfaces.CookieSessionObject;
interface IUserSession extends CookieSessionObject {
authHeader: string;
username?: string;
isUserLogin?: boolean; // authorization via user's manual login with credentials
}
export const userSession = {
get(req: Request): Partial<IUserSession> {
return req.session;
},
save(req: Request, data: Partial<IUserSession> = {}) {
Object.assign(req.session, data);
},
getToken(req: Request): string {
const { authHeader = "" } = this.get(req);
const [type, token = ""] = authHeader.split(" ");
return token;
}
};

View File

@ -1,32 +0,0 @@
// Load & parse local kubernetes config (dev-only)
import * as jsYaml from "js-yaml"
import * as fs from "fs"
import * as os from "os"
import chalk from "chalk";
import { logger } from "./logger";
interface IKubeConfigParams {
clusterUrl: string;
userToken: string;
}
export function getKubeConfigDev(): Partial<IKubeConfigParams> {
const KUBE_CONFIG_FILE = process.env.KUBE_CONFIG_FILE;
if (!KUBE_CONFIG_FILE) {
return {}
}
let filePath = ""
try {
filePath = KUBE_CONFIG_FILE.replace("~", os.homedir());
const yaml = fs.readFileSync(filePath).toString();
const config = jsYaml.safeLoad(yaml);
return {
clusterUrl: config.clusters[0].cluster.server,
userToken: config.users[0].user.token,
}
} catch (err) {
logger.error(`[KUBE-CONFIG] Parsing config file ${chalk.bold(filePath)} failed.`, err)
return {};
}
}

View File

@ -1,36 +0,0 @@
import chalk from "chalk";
import * as ip from "ip"
const divider = chalk.gray('-----------------------------------');
export const logger = {
// Called when express.js app starts on given port w/o errors
appStarted: (port: string | number, title = 'Server started ') => {
console.log(chalk.underline.bold(title) + ` ${chalk.green('✓')}`);
console.log(`
${chalk.bold('Access URLs:')}
${divider}
Localhost: ${chalk.magenta(`http://localhost:${port}`)}
LAN: ${chalk.magenta(`http://${ip.address()}:${port}`)}
${divider}
`);
},
error(message: string, error: any) {
let errString = ""
try {
errString = JSON.stringify(error, null, 2);
} catch (e) {
errString = String(error);
}
console.error(chalk.bold.red(`[ERROR] -> ${message}`), errString);
}
};
export function sanitizeHeaders(headers: { [name: string]: string }) {
if (headers.Authorization) {
const [authType, authToken] = headers.Authorization.split(" ");
headers.Authorization = `${authType} *****`
}
return headers;
}

View File

@ -1,30 +0,0 @@
// Parse payload from jwt token
// Format: https://github.com/kontena/kube-oidc#openid-connect-and-kubernetes
import { base64 } from "../../client/utils/base64";
interface JwtPayload {
"azp": string;// "1077841816959-kkdh0lvq1au80qv4gtubotvgs9am4a95.apps.googleusercontent.com",
"aud": string;// "1077841816959-kkdh0lvq1au80qv4gtubotvgs9am4a95.apps.googleusercontent.com",
"sub": string;// "103613003764490648449",
"hd": string;// "redhat.com",
"email": string;// "echiang@redhat.com",
"email_verified": boolean; // true,
"at_hash": string;// "OGDOjIJ92FkatDBoCm8ydg",
"exp": number;// 1527203940,
"iss": string;// "https://accounts.google.com",
"iat": number;// 1527200340,
"name": string; // "Eric Chiang",
"picture": string; // "https://lh5.googleusercontent.com/-Cs2iHTXiETs/AAAAAAAAAAI/AAAAAAAAACM/0Q85UhZizjg/s96-c/photo.jpg",
"given_name": string; // "Eric",
"family_name": string; //"Chiang",
"locale": string; // "en"
}
export function parseJwt(token: string): Partial<JwtPayload> {
try {
const [header, payload, signature] = token.split(".");
return base64.decode(payload);
} catch (e) {
return {}
}
}

View File

@ -1,16 +1,12 @@
import { app } from "electron"
import { KubeConfig } from "@kubernetes/client-node"
import { readFileSync } from "fs"
import * as http from "http"
import { ServerOptions } from "http-proxy"
import * as url from "url"
import { v4 as uuid } from "uuid"
import logger from "./logger"
import { getFreePort } from "./port"
import { LensServer } from "./lens-server"
import { KubeAuthProxy } from "./kube-auth-proxy"
import { Cluster, ClusterPreferences } from "./cluster"
import { userStore } from "../common/user-store"
export class ContextHandler {
public contextName: string
@ -24,14 +20,12 @@ export class ContextHandler {
protected apiTarget: ServerOptions
protected proxyTarget: ServerOptions
protected clusterUrl: url.UrlWithStringQuery
protected localServer: LensServer
protected proxyServer: KubeAuthProxy
protected clientCert: string
protected clientKey: string
protected secureApiConnection = true
protected defaultNamespace: string
protected port: number
protected proxyPort: number
protected kubernetesApi: string
protected prometheusPath: string
@ -128,41 +122,6 @@ export class ContextHandler {
return this.apiTarget
}
public async getProxyTarget() {
if (this.proxyTarget) {
return this.proxyTarget;
}
this.proxyTarget = {
changeOrigin: true,
secure: false,
target: {
host: this.clusterUrl.host,
hostname: "localhost",
path: "/",
port: await this.resolvePort(),
protocol: "http://",
},
}
return this.proxyTarget;
}
protected async resolvePort(): Promise<number> {
if (this.port) return this.port
let serverPort: number = null
try {
serverPort = await getFreePort(49153, 49900) // the proxy will usually already be on 49152 so skip that
} catch(error) {
logger.error(error)
throw(error)
}
this.port = serverPort
return serverPort
}
protected async resolveProxyPort(): Promise<number> {
if (this.proxyPort) return this.proxyPort
@ -190,35 +149,7 @@ export class ContextHandler {
}
}
protected initServer(serverUrl: string, port: number) {
const userPrefs = userStore.getPreferences()
const envs = {
KUBE_CLUSTER_URL: serverUrl,
KUBE_CLUSTER_NAME: this.clusterName,
KUBERNETES_TLS_SKIP: "true",
KUBERNETES_NAMESPACE: this.defaultNamespace,
SESSION_SECRET: this.id,
LOCAL_SERVER_PORT: port.toString(),
KUBE_METRICS_URL: `${serverUrl}/api/v1/namespaces/${this.prometheusPath}/proxy`,
STATS_NAMESPACE_DEFAULT: this.prometheusPath.split("/")[0],
CHARTS_ENABLED: "true",
LENS_VERSION: app.getVersion(),
LENS_THEME: `kontena-${userPrefs.colorTheme}`,
NODE_ENV: "production",
}
logger.debug(`spinning up lens-server process with env: ${JSON.stringify(envs)}`)
this.localServer = new LensServer(serverUrl, envs)
}
public async ensureServer() {
if (!this.localServer) {
const currentCluster = this.kc.getCurrentCluster()
const clusterUrl = url.parse(currentCluster.server)
const serverPort = await this.resolvePort()
logger.info(`initializing server for ${clusterUrl.host} on port ${serverPort}`)
this.initServer(this.kubernetesApi, serverPort)
await this.localServer.run()
}
if (!this.proxyServer) {
const proxyPort = await this.resolveProxyPort()
const proxyEnv = Object.assign({}, process.env)
@ -231,10 +162,6 @@ export class ContextHandler {
}
public stopServer() {
if (this.localServer) {
this.localServer.exit()
this.localServer = null
}
if (this.proxyServer) {
this.proxyServer.exit()
this.proxyServer = null

View File

@ -1,63 +0,0 @@
import * as path from "path"
import { spawn, ChildProcess } from "child_process"
import logger from "./logger"
import * as tcpPortUsed from "tcp-port-used"
declare const __static: string;
const isDevelopment = process.env.NODE_ENV !== "production"
let serverPath: string = null
if (isDevelopment) {
serverPath = path.join(process.cwd(), "binaries", "server", process.platform, "lens-server")
} else {
serverPath = path.join(process.resourcesPath, "lens-server")
if (process.platform !== "win32") {
serverPath = `${serverPath}.txt`
}
}
if (process.platform === "win32") {
serverPath = `${serverPath}-${process.arch}.exe`
}
export class LensServer {
protected serverUrl: string = null
protected env: NodeJS.ProcessEnv = null
protected localServer: ChildProcess
constructor(serverUrl: string, env: NodeJS.ProcessEnv) {
this.serverUrl = serverUrl
this.env = env
}
public async run(): Promise<void> {
if (this.localServer) {
return new Promise((resolve, reject) => {
resolve()
})
}
this.localServer = spawn(serverPath, [], {
env: this.env,
cwd: __static
})
this.localServer.on("exit", (code) => {
logger.error(`server ${this.serverUrl} exited with code ${code}`)
this.localServer = null
})
this.localServer.stdout.on('data', (data) => {
logger.debug(`server ${this.serverUrl} stdout: ${data}`)
})
this.localServer.stderr.on('data', (data) => {
logger.debug(`server ${this.serverUrl} stderr: ${data}`)
})
return tcpPortUsed.waitUntilUsed(parseInt(this.env.LOCAL_SERVER_PORT), 500, 10000)
}
public exit() {
if (this.localServer) {
logger.debug(`Stopping local server: ${this.serverUrl}`)
this.localServer.kill()
this.localServer = null
}
}
}

View File

@ -35,11 +35,7 @@ export class LensProxy {
this.handleRequest(proxy, req, res);
}.bind(this));
proxyServer.on("upgrade", function(req: http.IncomingMessage, socket: Socket, head: Buffer) {
if (this.isRemoteShellRequired(req)) {
this.proxyWsUpgrade(proxy, req, socket, head)
} else {
this.handleWsUpgrade(req, socket, head)
}
this.handleWsUpgrade(req, socket, head)
}.bind(this));
proxyServer.on("error", (err) => {
@ -135,8 +131,6 @@ export class LensProxy {
delete req.headers.authorization
req.url = req.url.replace("/api-kube", "")
return await contextHandler.getApiTarget()
} else {
return await contextHandler.getProxyTarget()
}
}
@ -158,24 +152,13 @@ export class LensProxy {
return
}
contextHandler.ensureServer().then(async () => {
if (await this.router.route(cluster, req, res)) return
const proxyTarget = await this.getProxyTarget(req, contextHandler)
proxy.web(req, res, proxyTarget)
})
}
protected async proxyWsUpgrade(proxy: httpProxy, req: http.IncomingMessage, socket: Socket, head: Buffer) {
const cluster = this.clusterManager.getClusterForRequest(req)
const contextHandler = cluster.contextHandler
contextHandler.applyHeaders(req);
const reqUrl = url.parse(req.url, true)
const urlParams = reqUrl.query
for (const [key, value] of Object.entries(urlParams)) {
if (key !== "token") {
req.headers["x-lens-param-" + key] = value
if (proxyTarget) {
proxy.web(req, res, proxyTarget)
} else {
await this.router.route(cluster, req, res)
}
}
proxy.ws(req, socket, head, await contextHandler.getProxyTarget());
})
}
protected async handleWsUpgrade(req: http.IncomingMessage, socket: Socket, head: Buffer) {
@ -187,13 +170,6 @@ export class LensProxy {
wsServer.emit("connection", con, req);
});
}
protected isRemoteShellRequired(req: http.IncomingMessage) {
if (!LensProxy.localShellSessions) {
return true
}
return false;
}
}
export function listen(port: number, clusterManager: ClusterManager) {

View File

@ -1,16 +1,23 @@
import * as http from "http";
import { Cluster } from "./cluster";
import * as http from "http"
import * as path from "path"
import { Cluster } from "./cluster"
import { configRoute } from "./routes/config"
import { helmApi } from "./helm-api"
import { resourceApplierApi } from "./resource-applier-api"
import { kubeconfigRoute } from "./routes/kubeconfig"
import { metricsRoute } from "./routes/metrics"
import { watchRoute } from "./routes/watch"
import { readFile } from "fs"
// eslint-disable-next-line @typescript-eslint/no-var-requires
const Call = require('@hapi/call');
// eslint-disable-next-line @typescript-eslint/no-var-requires
const Subtext = require('@hapi/subtext');
declare const __static: string;
const assetsPath = path.join(__static, "build/client")
interface RouteParams {
[key: string]: string | undefined;
}
@ -67,10 +74,32 @@ export class Router {
return request
}
protected handleStaticFile(file: string, response: http.ServerResponse) {
const asset = path.join(assetsPath, file)
readFile(asset, (err, data) => {
if (err) {
response.statusCode = 404
} else {
response.write(data)
response.end()
}
})
}
protected addRoutes() {
// Static assets
this.router.add({ method: 'get', path: '/{path*}' }, (request: LensApiRequest) => {
const { response, params } = request
const file = params.path || "/index.html"
this.handleStaticFile(file, response)
})
this.router.add({ method: 'get', path: '/api/config' }, configRoute.routeConfig.bind(configRoute))
this.router.add({ method: 'get', path: '/api/kubeconfig/service-account/{namespace}/{account}' }, kubeconfigRoute.routeServiceAccountRoute.bind(kubeconfigRoute))
// Watch API
this.router.add({ method: 'get', path: '/api/watch' }, watchRoute.routeWatch.bind(watchRoute))
// Metrics API
this.router.add({ method: 'post', path: '/api/metrics' }, metricsRoute.routeMetrics.bind(metricsRoute))

96
src/main/routes/watch.ts Normal file
View File

@ -0,0 +1,96 @@
import { LensApiRequest } from "../router"
import { LensApi } from "../lens-api"
import { Watch, KubeConfig, RuntimeRawExtension } from "@kubernetes/client-node"
import { ServerResponse } from "http"
import { Request } from "request"
import logger from "../logger"
class ApiWatcher {
private apiUrl: string
private response: ServerResponse
private watchRequest: Request
private watch: Watch
constructor(apiUrl: string, kubeConfig: KubeConfig, response: ServerResponse) {
this.apiUrl = apiUrl
this.watch = new Watch(kubeConfig)
this.response = response
}
public start() {
this.watchRequest = this.watch.watch(this.apiUrl, {}, this.watchHandler.bind(this), this.doneHandler.bind(this))
}
public stop() {
if (!this.watchRequest) { return }
this.watchRequest.abort()
}
private watchHandler(phase: string, obj: RuntimeRawExtension) {
this.sendEvent({
type: phase,
object: obj
})
}
private doneHandler(error: Error) {
if (error) {
logger.error("watch error: " + error.toString())
this.sendEvent({
type: "STREAM_END",
url: this.apiUrl,
status: 410,
})
return
}
this.start()
}
private sendEvent(evt: any, autoFlush = true) {
// convert to "text/event-stream" format
this.response.write(`data: ${JSON.stringify(evt)}\n\n`);
if (autoFlush) {
// @ts-ignore
this.response.flush()
}
}
}
class WatchRoute extends LensApi {
public async routeWatch(request: LensApiRequest) {
const { params, response, cluster} = request
const apis: string[] = request.query.getAll("api")
const watchers: ApiWatcher[] = []
if (!apis.length) {
this.respondJson(response, {
message: "Empty request. Query params 'api' are not provided.",
example: "?api=/api/v1/pods&api=/api/v1/nodes",
}, 400)
return
}
response.setHeader("Content-Type", "text/event-stream")
response.setHeader("Cache-Control", "no-cache")
response.setHeader("Connection", "keep-alive")
apis.forEach(apiUrl => {
const watcher = new ApiWatcher(apiUrl, cluster.contextHandler.kc, response)
watcher.start()
watchers.push(watcher)
})
request.raw.req.on("close", () => {
watchers.map(watcher => watcher.stop())
})
request.raw.req.on("end", () => {
watchers.map(watcher => watcher.stop())
})
}
}
export const watchRoute = new WatchRoute()