1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

Watch api does not work for non-admins with lots of namespaces #1898 -- part 1

Signed-off-by: Roman <ixrock@gmail.com>
This commit is contained in:
Roman 2021-01-12 17:26:03 +02:00
parent 4a8079debc
commit 6dcb48da1a
4 changed files with 176 additions and 138 deletions

View File

@ -146,7 +146,7 @@ export class Router {
this.router.add({ method: "get", path: `${apiPrefix}/kubeconfig/service-account/{namespace}/{account}` }, kubeconfigRoute.routeServiceAccountRoute.bind(kubeconfigRoute));
// Watch API
this.router.add({ method: "get", path: `${apiPrefix}/watch` }, watchRoute.routeWatch.bind(watchRoute));
this.router.add({ method: "post", path: `${apiPrefix}/watch` }, watchRoute.routeWatch.bind(watchRoute));
// Metrics API
this.router.add({ method: "post", path: `${apiPrefix}/metrics` }, metricsRoute.routeMetrics.bind(metricsRoute));

View File

@ -1,10 +1,27 @@
import type { KubeJsonApiData, KubeJsonApiError } from "../../renderer/api/kube-json-api";
import { LensApiRequest } from "../router";
import { LensApi } from "../lens-api";
import { Watch, KubeConfig } from "@kubernetes/client-node";
import { KubeConfig, Watch } from "@kubernetes/client-node";
import { ServerResponse } from "http";
import { Request } from "request";
import logger from "../logger";
export interface IKubeWatchEvent<T = KubeJsonApiData | KubeJsonApiError> {
type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR" | "STREAM_END";
object?: T;
}
export interface IKubeWatchEventStreamEnd extends IKubeWatchEvent {
type: "STREAM_END";
url: string;
status: number;
}
export interface IWatchRoutePayload {
apis: string[]; // kube-api url list for subscribing to watch events
}
class ApiWatcher {
private apiUrl: string;
private response: ServerResponse;
@ -33,7 +50,9 @@ class ApiWatcher {
}
public stop() {
if (!this.watchRequest) { return; }
if (!this.watchRequest) {
return;
}
if (this.processor) {
clearInterval(this.processor);
@ -42,11 +61,14 @@ class ApiWatcher {
try {
this.watchRequest.abort();
this.sendEvent({
const event: IKubeWatchEventStreamEnd = {
type: "STREAM_END",
url: this.apiUrl,
status: 410,
});
};
this.sendEvent(event);
logger.debug("watch aborted");
} catch (error) {
logger.error(`Watch abort errored:${error}`);
@ -65,34 +87,31 @@ class ApiWatcher {
this.watchRequest.abort();
}
private sendEvent(evt: any) {
// convert to "text/event-stream" format
this.response.write(`data: ${JSON.stringify(evt)}\n\n`);
private sendEvent(evt: IKubeWatchEvent) {
this.response.write(JSON.stringify(evt) + "\n");
}
}
class WatchRoute extends LensApi {
public async routeWatch(request: LensApiRequest) {
const { response, cluster} = request;
const apis: string[] = request.query.getAll("api");
public async routeWatch(request: LensApiRequest<IWatchRoutePayload>) {
const { response, cluster, payload } = request;
const watchers: ApiWatcher[] = [];
if (!apis.length) {
if (!payload?.apis?.length) {
this.respondJson(response, {
message: "Empty request. Query params 'api' are not provided.",
example: "?api=/api/v1/pods&api=/api/v1/nodes",
message: "watch apis list is empty"
}, 400);
return;
}
response.setHeader("Content-Type", "text/event-stream");
response.setHeader("Content-Type", "application/json");
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Connection", "keep-alive");
logger.debug(`watch using kubeconfig:${JSON.stringify(cluster.getProxyKubeconfig(), null, 2)}`);
apis.forEach(apiUrl => {
payload.apis.forEach(apiUrl => {
const watcher = new ApiWatcher(apiUrl, cluster.getProxyKubeconfig(), response);
watcher.start();

View File

@ -1,41 +1,41 @@
// Kubernetes watch-api consumer
import type { IKubeWatchEvent, IKubeWatchEventStreamEnd, IWatchRoutePayload } from "../../main/routes/watch-route";
import type { KubeObjectStore } from "../kube-object.store";
import type { KubeObject } from "./kube-object";
import { computed, observable, reaction } from "mobx";
import { stringify } from "querystring";
import { autobind, EventEmitter } from "../utils";
import { KubeJsonApiData } from "./kube-json-api";
import type { KubeObjectStore } from "../kube-object.store";
import { KubeJsonApiData, KubeJsonApiError } from "./kube-json-api";
import { ensureObjectSelfLink, KubeApi } from "./kube-api";
import { apiManager } from "./api-manager";
import { apiPrefix, isDevelopment } from "../../common/vars";
import { getHostedCluster } from "../../common/cluster-store";
import { apiPrefix, isDevelopment } from "../../common/vars";
import { apiManager } from "./api-manager";
export interface IKubeWatchEvent<T = any> {
type: "ADDED" | "MODIFIED" | "DELETED" | "ERROR";
object?: T;
}
export { IKubeWatchEvent, IKubeWatchEventStreamEnd }
export interface IKubeWatchRouteEvent {
type: "STREAM_END";
url: string;
status: number;
}
export interface IKubeWatchRouteQuery {
api: string | string[];
export interface IKubeWatchMessage<T extends KubeObject = any> {
data?: IKubeWatchEvent<KubeJsonApiData>
error?: IKubeWatchEvent<KubeJsonApiError>;
api?: KubeApi<T>;
store?: KubeObjectStore<T>;
}
@autobind()
export class KubeWatchApi {
protected evtSource: EventSource;
protected onData = new EventEmitter<[IKubeWatchEvent]>();
protected stream: ReadableStream<Uint8Array>; // https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams
protected subscribers = observable.map<KubeApi, number>();
protected reconnectTimeoutMs = 5000;
protected maxReconnectsOnError = 10;
protected reconnectAttempts = this.maxReconnectsOnError;
// events
onMessage = new EventEmitter<[IKubeWatchMessage]>();
constructor() {
reaction(() => this.activeApis, () => this.connect(), {
this.bindAutoConnect();
}
private bindAutoConnect() {
return reaction(() => this.activeApis, () => this.connect(), {
fireImmediately: true,
delay: 500,
});
@ -62,17 +62,13 @@ export class KubeWatchApi {
});
}
// FIXME: use POST to send apis for subscribing (list could be huge)
// TODO: try to use normal fetch res.body stream to consume watch-api updates
// https://github.com/lensapp/lens/issues/1898
protected async getQuery() {
protected async getWatchRoutePayload(): Promise<IWatchRoutePayload> {
const { namespaceStore } = await import("../components/+namespaces/namespace.store");
await namespaceStore.whenReady;
const { isAdmin } = getHostedCluster();
return {
api: this.activeApis.map(api => {
apis: this.activeApis.map(api => {
if (isAdmin && !api.isNamespaced) {
return api.getWatchUrl();
}
@ -86,117 +82,141 @@ export class KubeWatchApi {
};
}
// todo: maybe switch to websocket to avoid often reconnects
@autobind()
protected async connect() {
if (this.evtSource) this.disconnect(); // close previous connection
this.disconnect(); // close active connection first
const query = await this.getQuery();
const payload = await this.getWatchRoutePayload();
if (!this.activeApis.length || !query.api.length) {
if (!payload.apis.length) {
return;
}
const apiUrl = `${apiPrefix}/watch?${stringify(query)}`;
this.writeLog({
data: ["CONNECTING", payload.apis]
});
this.evtSource = new EventSource(apiUrl);
this.evtSource.onmessage = this.onMessage;
this.evtSource.onerror = this.onError;
this.writeLog("CONNECTING", query.api);
}
try {
const req = await fetch(`${apiPrefix}/watch`, {
method: "POST",
body: JSON.stringify(payload),
keepalive: true,
headers: {
"content-type": "application/json"
}
});
reconnect() {
if (!this.evtSource || this.evtSource.readyState !== EventSource.OPEN) {
this.reconnectAttempts = this.maxReconnectsOnError;
this.connect();
const reader = req.body.getReader();
const handleEvent = this.handleEvent.bind(this);
this.stream = new ReadableStream({
start(controller) {
return reader.read().then(function processEvent({ done, value }): Promise<void> {
if (done) {
controller.close();
return;
}
handleEvent(value);
controller.enqueue(value);
return reader.read().then(processEvent);
});
},
cancel() {
reader.cancel();
}
});
} catch (error) {
this.writeLog({
error: ["CONNECTION ERROR", error]
});
}
}
protected disconnect() {
if (!this.evtSource) return;
this.evtSource.close();
this.evtSource.onmessage = null;
this.evtSource = null;
}
protected onMessage(evt: MessageEvent) {
if (!evt.data) return;
const data = JSON.parse(evt.data);
if ((data as IKubeWatchEvent).object) {
this.onData.emit(data);
} else {
this.onRouteEvent(data);
protected async disconnect() {
if (this.stream) {
this.stream.cancel();
this.stream = null;
}
}
protected async onRouteEvent(event: IKubeWatchRouteEvent) {
if (event.type === "STREAM_END") {
this.disconnect();
const { apiBase, namespace } = KubeApi.parseApi(event.url);
const api = apiManager.getApi(apiBase);
protected handleEvent(eventStreamChunk: Uint8Array) {
try {
const jsonText = new TextDecoder().decode(eventStreamChunk);
const event: IKubeWatchEvent = JSON.parse(jsonText);
const message = this.getMessage(event);
this.onMessage.emit(message);
} catch (error) {
this.writeLog({
error: ["failed to parse watch-api event", error]
});
}
}
if (api) {
try {
await api.refreshResourceVersion({ namespace });
this.reconnect();
} catch (error) {
console.error("failed to refresh resource version", error);
protected getMessage(event: IKubeWatchEvent): IKubeWatchMessage {
const message: IKubeWatchMessage = {};
if (this.subscribers.size > 0) {
setTimeout(() => {
this.onRouteEvent(event);
}, 1000);
}
switch (event.type) {
case "ADDED":
case "DELETED":
case "MODIFIED": {
const data = event as IKubeWatchEvent<KubeJsonApiData>;
const api = apiManager.getApiByKind(data.object.kind, data.object.apiVersion);
message.data = data;
if (api) {
ensureObjectSelfLink(api, data.object);
const { namespace, resourceVersion } = data.object.metadata;
api.setResourceVersion(namespace, resourceVersion);
api.setResourceVersion("", resourceVersion);
message.api = api;
message.store = apiManager.getStore(api);
}
break;
}
case "ERROR":
message.error = event as IKubeWatchEvent<KubeJsonApiError>;
break;
case "STREAM_END": {
this.onServerStreamEnd(event as IKubeWatchEventStreamEnd);
break;
}
}
return message;
}
protected async onServerStreamEnd(event: IKubeWatchEventStreamEnd) {
const { apiBase, namespace } = KubeApi.parseApi(event.url);
const api = apiManager.getApi(apiBase);
if (api) {
try {
await api.refreshResourceVersion({ namespace });
this.connect();
} catch (error) {
this.writeLog({
error: ["failed to reconnect after stream ending", { event, error }]
});
if (this.subscribers.size > 0) {
setTimeout(() => {
this.onServerStreamEnd(event);
}, 1000);
}
}
}
}
protected onError(evt: MessageEvent) {
const { reconnectAttempts: attemptsRemain, reconnectTimeoutMs } = this;
if (evt.eventPhase === EventSource.CLOSED) {
if (attemptsRemain > 0) {
this.reconnectAttempts--;
setTimeout(() => this.connect(), reconnectTimeoutMs);
}
}
}
protected writeLog(...data: any[]) {
protected writeLog({ data, error }: { data?: any[], error?: any[] } = {}) {
if (isDevelopment) {
console.log("%cKUBE-WATCH-API:", `font-weight: bold`, ...data);
const logStyle = `font-weight: bold; ${error ? "color: red;" : ""}`;
console.log("%cKUBE-WATCH-API:", logStyle, ...Array.from(data || error));
}
}
addListener(store: KubeObjectStore, callback: (evt: IKubeWatchEvent) => void) {
const listener = (evt: IKubeWatchEvent<KubeJsonApiData>) => {
if (evt.type === "ERROR") {
return; // e.g. evt.object.message == "too old resource version"
}
const { namespace, resourceVersion } = evt.object.metadata;
const api = apiManager.getApiByKind(evt.object.kind, evt.object.apiVersion);
api.setResourceVersion(namespace, resourceVersion);
api.setResourceVersion("", resourceVersion);
ensureObjectSelfLink(api, evt.object);
if (store == apiManager.getStore(api)) {
callback(evt);
}
};
this.onData.addListener(listener);
return () => this.onData.removeListener(listener);
}
reset() {
this.subscribers.clear();
}
}
export const kubeWatchApi = new KubeWatchApi();

View File

@ -1,7 +1,7 @@
import { action, observable, reaction } from "mobx";
import { autobind } from "./utils";
import { KubeObject } from "./api/kube-object";
import { IKubeWatchEvent, kubeWatchApi } from "./api/kube-watch-api";
import { IKubeWatchEvent, IKubeWatchMessage, kubeWatchApi } from "./api/kube-watch-api";
import { ItemStore } from "./item.store";
import { apiManager } from "./api/api-manager";
import { IKubeApiQueryParams, KubeApi } from "./api/kube-api";
@ -23,7 +23,6 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
constructor() {
super();
this.bindWatchEventsUpdater();
kubeWatchApi.addListener(this, this.onWatchApiEvent);
}
get query(): IKubeApiQueryParams {
@ -187,7 +186,12 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
protected eventsBuffer = observable<IKubeWatchEvent<KubeJsonApiData>>([], { deep: false });
protected bindWatchEventsUpdater(delay = 1000) {
return reaction(() => this.eventsBuffer.toJS()[0], this.updateFromEventsBuffer, {
kubeWatchApi.onMessage.addListener(({ store, data }: IKubeWatchMessage<T>) => {
if (!this.isLoaded || store !== this) return;
this.eventsBuffer.push(data);
})
reaction(() => this.eventsBuffer[0], this.updateFromEventsBuffer, {
delay
});
}
@ -196,11 +200,6 @@ export abstract class KubeObjectStore<T extends KubeObject = any> extends ItemSt
return KubeApi.watchAll(...apis);
}
protected onWatchApiEvent(evt: IKubeWatchEvent) {
if (!this.isLoaded) return;
this.eventsBuffer.push(evt);
}
@action
protected updateFromEventsBuffer() {
if (!this.eventsBuffer.length) {