From 6dcb48da1a39653f3082a97dfb7c7e1a2cd3731c Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 12 Jan 2021 17:26:03 +0200 Subject: [PATCH] Watch api does not work for non-admins with lots of namespaces #1898 -- part 1 Signed-off-by: Roman --- src/main/router.ts | 2 +- src/main/routes/watch-route.ts | 49 ++++-- src/renderer/api/kube-watch-api.ts | 248 ++++++++++++++++------------- src/renderer/kube-object.store.ts | 15 +- 4 files changed, 176 insertions(+), 138 deletions(-) diff --git a/src/main/router.ts b/src/main/router.ts index 896893a592..6e98d0ce0c 100644 --- a/src/main/router.ts +++ b/src/main/router.ts @@ -146,7 +146,7 @@ export class Router { this.router.add({ method: "get", path: `${apiPrefix}/kubeconfig/service-account/{namespace}/{account}` }, kubeconfigRoute.routeServiceAccountRoute.bind(kubeconfigRoute)); // Watch API - this.router.add({ method: "get", path: `${apiPrefix}/watch` }, watchRoute.routeWatch.bind(watchRoute)); + this.router.add({ method: "post", path: `${apiPrefix}/watch` }, watchRoute.routeWatch.bind(watchRoute)); // Metrics API this.router.add({ method: "post", path: `${apiPrefix}/metrics` }, metricsRoute.routeMetrics.bind(metricsRoute)); diff --git a/src/main/routes/watch-route.ts b/src/main/routes/watch-route.ts index eb9f007eae..1ed5b3ac4d 100644 --- a/src/main/routes/watch-route.ts +++ b/src/main/routes/watch-route.ts @@ -1,10 +1,27 @@ +import type { KubeJsonApiData, KubeJsonApiError } from "../../renderer/api/kube-json-api"; + import { LensApiRequest } from "../router"; import { LensApi } from "../lens-api"; -import { Watch, KubeConfig } from "@kubernetes/client-node"; +import { KubeConfig, Watch } from "@kubernetes/client-node"; import { ServerResponse } from "http"; import { Request } from "request"; import logger from "../logger"; +export interface IKubeWatchEvent { + type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR" | "STREAM_END"; + object?: T; +} + +export interface IKubeWatchEventStreamEnd extends IKubeWatchEvent { + type: "STREAM_END"; + url: string; + status: number; +} + +export interface IWatchRoutePayload { + apis: string[]; // kube-api url list for subscribing to watch events +} + class ApiWatcher { private apiUrl: string; private response: ServerResponse; @@ -33,7 +50,9 @@ class ApiWatcher { } public stop() { - if (!this.watchRequest) { return; } + if (!this.watchRequest) { + return; + } if (this.processor) { clearInterval(this.processor); @@ -42,11 +61,14 @@ class ApiWatcher { try { this.watchRequest.abort(); - this.sendEvent({ + + const event: IKubeWatchEventStreamEnd = { type: "STREAM_END", url: this.apiUrl, status: 410, - }); + }; + + this.sendEvent(event); logger.debug("watch aborted"); } catch (error) { logger.error(`Watch abort errored:${error}`); @@ -65,34 +87,31 @@ class ApiWatcher { this.watchRequest.abort(); } - private sendEvent(evt: any) { - // convert to "text/event-stream" format - this.response.write(`data: ${JSON.stringify(evt)}\n\n`); + private sendEvent(evt: IKubeWatchEvent) { + this.response.write(JSON.stringify(evt) + "\n"); } } class WatchRoute extends LensApi { - public async routeWatch(request: LensApiRequest) { - const { response, cluster} = request; - const apis: string[] = request.query.getAll("api"); + public async routeWatch(request: LensApiRequest) { + const { response, cluster, payload } = request; const watchers: ApiWatcher[] = []; - if (!apis.length) { + if (!payload?.apis?.length) { this.respondJson(response, { - message: "Empty request. Query params 'api' are not provided.", - example: "?api=/api/v1/pods&api=/api/v1/nodes", + message: "watch apis list is empty" }, 400); return; } - response.setHeader("Content-Type", "text/event-stream"); + response.setHeader("Content-Type", "application/json"); response.setHeader("Cache-Control", "no-cache"); response.setHeader("Connection", "keep-alive"); logger.debug(`watch using kubeconfig:${JSON.stringify(cluster.getProxyKubeconfig(), null, 2)}`); - apis.forEach(apiUrl => { + payload.apis.forEach(apiUrl => { const watcher = new ApiWatcher(apiUrl, cluster.getProxyKubeconfig(), response); watcher.start(); diff --git a/src/renderer/api/kube-watch-api.ts b/src/renderer/api/kube-watch-api.ts index fe35a04baa..fcd84c75c0 100644 --- a/src/renderer/api/kube-watch-api.ts +++ b/src/renderer/api/kube-watch-api.ts @@ -1,41 +1,41 @@ // Kubernetes watch-api consumer +import type { IKubeWatchEvent, IKubeWatchEventStreamEnd, IWatchRoutePayload } from "../../main/routes/watch-route"; +import type { KubeObjectStore } from "../kube-object.store"; +import type { KubeObject } from "./kube-object"; import { computed, observable, reaction } from "mobx"; -import { stringify } from "querystring"; import { autobind, EventEmitter } from "../utils"; -import { KubeJsonApiData } from "./kube-json-api"; -import type { KubeObjectStore } from "../kube-object.store"; +import { KubeJsonApiData, KubeJsonApiError } from "./kube-json-api"; import { ensureObjectSelfLink, KubeApi } from "./kube-api"; -import { apiManager } from "./api-manager"; -import { apiPrefix, isDevelopment } from "../../common/vars"; import { getHostedCluster } from "../../common/cluster-store"; +import { apiPrefix, isDevelopment } from "../../common/vars"; +import { apiManager } from "./api-manager"; -export interface IKubeWatchEvent { - type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR"; - object?: T; -} +export { IKubeWatchEvent, IKubeWatchEventStreamEnd } -export interface IKubeWatchRouteEvent { - type: "STREAM_END"; - url: string; - status: number; -} - -export interface IKubeWatchRouteQuery { - api: string | string[]; +export interface IKubeWatchMessage { + data?: IKubeWatchEvent + error?: IKubeWatchEvent; + api?: KubeApi; + store?: KubeObjectStore; } @autobind() export class KubeWatchApi { - protected evtSource: EventSource; - protected onData = new EventEmitter<[IKubeWatchEvent]>(); + protected stream: ReadableStream; // https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams protected subscribers = observable.map(); protected reconnectTimeoutMs = 5000; protected maxReconnectsOnError = 10; - protected reconnectAttempts = this.maxReconnectsOnError; + + // events + onMessage = new EventEmitter<[IKubeWatchMessage]>(); constructor() { - reaction(() => this.activeApis, () => this.connect(), { + this.bindAutoConnect(); + } + + private bindAutoConnect() { + return reaction(() => this.activeApis, () => this.connect(), { fireImmediately: true, delay: 500, }); @@ -62,17 +62,13 @@ export class KubeWatchApi { }); } - // FIXME: use POST to send apis for subscribing (list could be huge) - // TODO: try to use normal fetch res.body stream to consume watch-api updates - // https://github.com/lensapp/lens/issues/1898 - protected async getQuery() { + protected async getWatchRoutePayload(): Promise { const { namespaceStore } = await import("../components/+namespaces/namespace.store"); - await namespaceStore.whenReady; const { isAdmin } = getHostedCluster(); return { - api: this.activeApis.map(api => { + apis: this.activeApis.map(api => { if (isAdmin && !api.isNamespaced) { return api.getWatchUrl(); } @@ -86,117 +82,141 @@ export class KubeWatchApi { }; } - // todo: maybe switch to websocket to avoid often reconnects - @autobind() protected async connect() { - if (this.evtSource) this.disconnect(); // close previous connection + this.disconnect(); // close active connection first - const query = await this.getQuery(); + const payload = await this.getWatchRoutePayload(); - if (!this.activeApis.length || !query.api.length) { + if (!payload.apis.length) { return; } - const apiUrl = `${apiPrefix}/watch?${stringify(query)}`; + this.writeLog({ + data: ["CONNECTING", payload.apis] + }); - this.evtSource = new EventSource(apiUrl); - this.evtSource.onmessage = this.onMessage; - this.evtSource.onerror = this.onError; - this.writeLog("CONNECTING", query.api); - } + try { + const req = await fetch(`${apiPrefix}/watch`, { + method: "POST", + body: JSON.stringify(payload), + keepalive: true, + headers: { + "content-type": "application/json" + } + }); - reconnect() { - if (!this.evtSource || this.evtSource.readyState !== EventSource.OPEN) { - this.reconnectAttempts = this.maxReconnectsOnError; - this.connect(); + const reader = req.body.getReader(); + const handleEvent = this.handleEvent.bind(this); + + this.stream = new ReadableStream({ + start(controller) { + return reader.read().then(function processEvent({ done, value }): Promise { + if (done) { + controller.close(); + return; + } + handleEvent(value); + controller.enqueue(value); + return reader.read().then(processEvent); + }); + }, + cancel() { + reader.cancel(); + } + }); + } catch (error) { + this.writeLog({ + error: ["CONNECTION ERROR", error] + }); } } - protected disconnect() { - if (!this.evtSource) return; - this.evtSource.close(); - this.evtSource.onmessage = null; - this.evtSource = null; - } - - protected onMessage(evt: MessageEvent) { - if (!evt.data) return; - const data = JSON.parse(evt.data); - - if ((data as IKubeWatchEvent).object) { - this.onData.emit(data); - } else { - this.onRouteEvent(data); + protected async disconnect() { + if (this.stream) { + this.stream.cancel(); + this.stream = null; } } - protected async onRouteEvent(event: IKubeWatchRouteEvent) { - if (event.type === "STREAM_END") { - this.disconnect(); - const { apiBase, namespace } = KubeApi.parseApi(event.url); - const api = apiManager.getApi(apiBase); + protected handleEvent(eventStreamChunk: Uint8Array) { + try { + const jsonText = new TextDecoder().decode(eventStreamChunk); + const event: IKubeWatchEvent = JSON.parse(jsonText); + const message = this.getMessage(event); + this.onMessage.emit(message); + } catch (error) { + this.writeLog({ + error: ["failed to parse watch-api event", error] + }); + } + } - if (api) { - try { - await api.refreshResourceVersion({ namespace }); - this.reconnect(); - } catch (error) { - console.error("failed to refresh resource version", error); + protected getMessage(event: IKubeWatchEvent): IKubeWatchMessage { + const message: IKubeWatchMessage = {}; - if (this.subscribers.size > 0) { - setTimeout(() => { - this.onRouteEvent(event); - }, 1000); - } + switch (event.type) { + case "ADDED": + case "DELETED": + case "MODIFIED": { + const data = event as IKubeWatchEvent; + const api = apiManager.getApiByKind(data.object.kind, data.object.apiVersion); + + message.data = data; + + if (api) { + ensureObjectSelfLink(api, data.object); + + const { namespace, resourceVersion } = data.object.metadata; + api.setResourceVersion(namespace, resourceVersion); + api.setResourceVersion("", resourceVersion); + + message.api = api; + message.store = apiManager.getStore(api); + } + break; + } + + case "ERROR": + message.error = event as IKubeWatchEvent; + break; + + case "STREAM_END": { + this.onServerStreamEnd(event as IKubeWatchEventStreamEnd); + break; + } + } + + return message; + } + + protected async onServerStreamEnd(event: IKubeWatchEventStreamEnd) { + const { apiBase, namespace } = KubeApi.parseApi(event.url); + const api = apiManager.getApi(apiBase); + + if (api) { + try { + await api.refreshResourceVersion({ namespace }); + this.connect(); + } catch (error) { + this.writeLog({ + error: ["failed to reconnect after stream ending", { event, error }] + }); + + if (this.subscribers.size > 0) { + setTimeout(() => { + this.onServerStreamEnd(event); + }, 1000); } } } } - protected onError(evt: MessageEvent) { - const { reconnectAttempts: attemptsRemain, reconnectTimeoutMs } = this; - - if (evt.eventPhase === EventSource.CLOSED) { - if (attemptsRemain > 0) { - this.reconnectAttempts--; - setTimeout(() => this.connect(), reconnectTimeoutMs); - } - } - } - - protected writeLog(...data: any[]) { + protected writeLog({ data, error }: { data?: any[], error?: any[] } = {}) { if (isDevelopment) { - console.log("%cKUBE-WATCH-API:", `font-weight: bold`, ...data); + const logStyle = `font-weight: bold; ${error ? "color: red;" : ""}`; + console.log("%cKUBE-WATCH-API:", logStyle, ...Array.from(data || error)); } } - - addListener(store: KubeObjectStore, callback: (evt: IKubeWatchEvent) => void) { - const listener = (evt: IKubeWatchEvent) => { - if (evt.type === "ERROR") { - return; // e.g. evt.object.message == "too old resource version" - } - - const { namespace, resourceVersion } = evt.object.metadata; - const api = apiManager.getApiByKind(evt.object.kind, evt.object.apiVersion); - - api.setResourceVersion(namespace, resourceVersion); - api.setResourceVersion("", resourceVersion); - - ensureObjectSelfLink(api, evt.object); - - if (store == apiManager.getStore(api)) { - callback(evt); - } - }; - - this.onData.addListener(listener); - - return () => this.onData.removeListener(listener); - } - - reset() { - this.subscribers.clear(); - } } export const kubeWatchApi = new KubeWatchApi(); diff --git a/src/renderer/kube-object.store.ts b/src/renderer/kube-object.store.ts index 3ea6d3f3c3..490cea6f1f 100644 --- a/src/renderer/kube-object.store.ts +++ b/src/renderer/kube-object.store.ts @@ -1,7 +1,7 @@ import { action, observable, reaction } from "mobx"; import { autobind } from "./utils"; import { KubeObject } from "./api/kube-object"; -import { IKubeWatchEvent, kubeWatchApi } from "./api/kube-watch-api"; +import { IKubeWatchEvent, IKubeWatchMessage, kubeWatchApi } from "./api/kube-watch-api"; import { ItemStore } from "./item.store"; import { apiManager } from "./api/api-manager"; import { IKubeApiQueryParams, KubeApi } from "./api/kube-api"; @@ -23,7 +23,6 @@ export abstract class KubeObjectStore extends ItemSt constructor() { super(); this.bindWatchEventsUpdater(); - kubeWatchApi.addListener(this, this.onWatchApiEvent); } get query(): IKubeApiQueryParams { @@ -187,7 +186,12 @@ export abstract class KubeObjectStore extends ItemSt protected eventsBuffer = observable>([], { deep: false }); protected bindWatchEventsUpdater(delay = 1000) { - return reaction(() => this.eventsBuffer.toJS()[0], this.updateFromEventsBuffer, { + kubeWatchApi.onMessage.addListener(({ store, data }: IKubeWatchMessage) => { + if (!this.isLoaded || store !== this) return; + this.eventsBuffer.push(data); + }) + + reaction(() => this.eventsBuffer[0], this.updateFromEventsBuffer, { delay }); } @@ -196,11 +200,6 @@ export abstract class KubeObjectStore extends ItemSt return KubeApi.watchAll(...apis); } - protected onWatchApiEvent(evt: IKubeWatchEvent) { - if (!this.isLoaded) return; - this.eventsBuffer.push(evt); - } - @action protected updateFromEventsBuffer() { if (!this.eventsBuffer.length) {