/** * Copyright (c) OpenLens Authors. All rights reserved. * Licensed under MIT License. See LICENSE in root directory for more information. */ // Base class for building all kubernetes apis import { isFunction, merge } from "lodash"; import { stringify } from "querystring"; import { apiKubePrefix, isDevelopment } from "../../common/vars"; import logger from "../../main/logger"; import { apiManager } from "./api-manager"; import { apiBase, apiKube } from "./index"; import { createKubeApiURL, parseKubeApi } from "./kube-api-parse"; import { KubeObjectConstructor, KubeObject, KubeStatus } from "./kube-object"; import byline from "byline"; import type { IKubeWatchEvent } from "./kube-watch-event"; import { KubeJsonApi, KubeJsonApiData } from "./kube-json-api"; import { noop, WrappedAbortController } from "../utils"; import type { RequestInit } from "node-fetch"; import type AbortController from "abort-controller"; import { Agent, AgentOptions } from "https"; import type { Patch } from "rfc6902"; /** * The base constructor options for {@link KubeApi} for use with child classes * of it. */ export interface BaseKubeApiOptions { /** * If the API uses a different API endpoint (e.g. apiBase) depending on the cluster version, * fallback API bases can be listed individually. * The first (existing) API base is used in the requests, if apiBase is not found. * This option only has effect if checkPreferredVersion is true. */ fallbackApiBases?: string[]; /** * If `true` then will check all declared apiBases against the kube api server * for the first accepted one. */ checkPreferredVersion?: boolean; /** * The api instance to use for making requests * * @default apiKube */ request?: KubeJsonApi; } /** * The options used for creating a `KubeApi` */ export interface IKubeApiOptions extends BaseKubeApiOptions { /** * The constructor for the kube objects returned from the API */ objectConstructor: KubeObjectConstructor; /** * base api-path for listing all resources, e.g. "/api/v1/pods" * * @deprecated It should be specified on the `objectConstructor` */ apiBase?: string; /** * @deprecated should be specified by `objectConstructor` */ isNamespaced?: boolean; /** * @deprecated should be specified by `objectConstructor` */ kind?: string; } export interface IKubeApiQueryParams { watch?: boolean | number; resourceVersion?: string; timeoutSeconds?: number; limit?: number; // doesn't work with ?watch continue?: string; // might be used with ?limit from second request labelSelector?: string | string[]; // restrict list of objects by their labels, e.g. labelSelector: ["label=value"] fieldSelector?: string | string[]; // restrict list of objects by their fields, e.g. fieldSelector: "field=name" } export interface KubeApiListOptions { namespace?: string; reqInit?: RequestInit; } export interface IKubePreferredVersion { preferredVersion?: { version: string; }; } export interface IKubeResourceList { resources: { kind: string; name: string; namespaced: boolean; singularName: string; storageVersionHash: string; verbs: string[]; }[]; } export interface ILocalKubeApiConfig { metadata: { uid: string; }; } export type PropagationPolicy = undefined | "Orphan" | "Foreground" | "Background"; /** * @deprecated */ export interface IKubeApiCluster extends ILocalKubeApiConfig { } export type PartialKubeObject = Partial> & { metadata?: Partial; }; export interface IRemoteKubeApiConfig { cluster: { server: string; caData?: string; skipTLSVerify?: boolean; }; user: { token?: string | (() => Promise); clientCertificateData?: string; clientKeyData?: string; }; } export function forCluster = KubeApi>(cluster: ILocalKubeApiConfig, kubeClass: KubeObjectConstructor, apiClass: new (apiOpts: IKubeApiOptions) => Y = null): KubeApi { const url = new URL(apiBase.config.serverAddress); const request = new KubeJsonApi({ serverAddress: apiBase.config.serverAddress, apiBase: apiKubePrefix, debug: isDevelopment, }, { headers: { "Host": `${cluster.metadata.uid}.localhost:${url.port}`, }, }); if (!apiClass) { apiClass = KubeApi as new (apiOpts: IKubeApiOptions) => Y; } return new apiClass({ objectConstructor: kubeClass, request, }); } export function forRemoteCluster = KubeApi>(config: IRemoteKubeApiConfig, kubeClass: KubeObjectConstructor, apiClass: new (apiOpts: IKubeApiOptions) => Y = null): Y { const reqInit: RequestInit = {}; const agentOptions: AgentOptions = {}; if (config.cluster.skipTLSVerify === true) { agentOptions.rejectUnauthorized = false; } if (config.user.clientCertificateData) { agentOptions.cert = config.user.clientCertificateData; } if (config.user.clientKeyData) { agentOptions.key = config.user.clientKeyData; } if (config.cluster.caData) { agentOptions.ca = config.cluster.caData; } if (Object.keys(agentOptions).length > 0) { reqInit.agent = new Agent(agentOptions); } const token = config.user.token; const request = new KubeJsonApi({ serverAddress: config.cluster.server, apiBase: "", debug: isDevelopment, ...(token ? { getRequestOptions: async () => ({ headers: { "Authorization": `Bearer ${isFunction(token) ? await token() : token}`, }, }), } : {}), }, reqInit); if (!apiClass) { apiClass = KubeApi as new (apiOpts: IKubeApiOptions) => Y; } return new apiClass({ objectConstructor: kubeClass as KubeObjectConstructor, request, }); } export function ensureObjectSelfLink(api: KubeApi, object: KubeJsonApiData) { if (!object.metadata.selfLink) { object.metadata.selfLink = createKubeApiURL({ apiPrefix: api.apiPrefix, apiVersion: api.apiVersionWithGroup, resource: api.apiResource, namespace: api.isNamespaced ? object.metadata.namespace : undefined, name: object.metadata.name, }); } } export type KubeApiWatchCallback = (data: IKubeWatchEvent, error: any) => void; export interface KubeApiWatchOptions { namespace: string; callback?: KubeApiWatchCallback; abortController?: AbortController; watchId?: string; retry?: boolean; // timeout in seconds timeout?: number; } export type KubeApiPatchType = "merge" | "json" | "strategic"; const patchTypeHeaders: Record = { "merge": "application/merge-patch+json", "json": "application/json-patch+json", "strategic": "application/strategic-merge-patch+json", }; export interface ResourceDescriptor { /** * The name of the kubernetes resource */ name: string; /** * The namespace that the resource lives in (if the resource is namespaced) * * Note: if not provided and the resource kind is namespaced, then this defaults to `"default"` */ namespace?: string; } export interface DeleteResourceDescriptor extends ResourceDescriptor { /** * This determinines how child resources should be handled by kubernetes * * @default "Background" */ propagationPolicy?: PropagationPolicy; } export class KubeApi { readonly kind: string; readonly apiVersion: string; apiBase: string; apiPrefix: string; apiGroup: string; apiVersionPreferred?: string; readonly apiResource: string; readonly isNamespaced: boolean; public objectConstructor: KubeObjectConstructor; protected request: KubeJsonApi; protected resourceVersions = new Map(); protected watchDisposer: () => void; private watchId = 1; constructor(protected options: IKubeApiOptions) { const { objectConstructor, request, kind, isNamespaced } = options; const { apiBase, apiPrefix, apiGroup, apiVersion, resource } = parseKubeApi(options.apiBase || objectConstructor.apiBase); this.options = options; this.kind = kind ?? objectConstructor.kind; this.isNamespaced = isNamespaced ?? objectConstructor.namespaced ?? false; this.apiBase = apiBase; this.apiPrefix = apiPrefix; this.apiGroup = apiGroup; this.apiVersion = apiVersion; this.apiResource = resource; this.request = request ?? apiKube; this.objectConstructor = objectConstructor; this.parseResponse = this.parseResponse.bind(this); apiManager.registerApi(this.apiBase, this); } get apiVersionWithGroup() { return [this.apiGroup, this.apiVersionPreferred ?? this.apiVersion] .filter(Boolean) .join("/"); } /** * Returns the latest API prefix/group that contains the required resource. * First tries options.apiBase, then urls in order from options.fallbackApiBases. */ private async getLatestApiPrefixGroup() { // Note that this.options.apiBase is the "full" url, whereas this.apiBase is parsed const apiBases = [this.options.apiBase, this.objectConstructor.apiBase, ...this.options.fallbackApiBases]; for (const apiUrl of apiBases) { if (!apiUrl) { continue; } try { // Split e.g. "/apis/extensions/v1beta1/ingresses" to parts const { apiPrefix, apiGroup, apiVersionWithGroup, resource } = parseKubeApi(apiUrl); // Request available resources const response = await this.request.get(`${apiPrefix}/${apiVersionWithGroup}`); // If the resource is found in the group, use this apiUrl if (response.resources?.find(kubeResource => kubeResource.name === resource)) { return { apiPrefix, apiGroup }; } } catch (error) { // Exception is ignored as we can try the next url } } throw new Error(`Can't find working API for the Kubernetes resource ${this.apiResource}`); } /** * Get the apiPrefix and apiGroup to be used for fetching the preferred version. */ private async getPreferredVersionPrefixGroup() { if (this.options.fallbackApiBases) { try { return await this.getLatestApiPrefixGroup(); } catch (error) { // If valid API wasn't found, log the error and return defaults below logger.error(`[KUBE-API]: ${error}`); } } return { apiPrefix: this.apiPrefix, apiGroup: this.apiGroup, }; } protected async checkPreferredVersion() { if (this.options.fallbackApiBases && !this.options.checkPreferredVersion) { throw new Error("checkPreferredVersion must be enabled if fallbackApiBases is set in KubeApi"); } if (this.options.checkPreferredVersion && this.apiVersionPreferred === undefined) { const { apiPrefix, apiGroup } = await this.getPreferredVersionPrefixGroup(); // The apiPrefix and apiGroup might change due to fallbackApiBases, so we must override them this.apiPrefix = apiPrefix; this.apiGroup = apiGroup; const url = [apiPrefix, apiGroup].filter(Boolean).join("/"); const res = await this.request.get(url); this.apiVersionPreferred = res?.preferredVersion?.version ?? null; if (this.apiVersionPreferred) { this.apiBase = this.computeApiBase(); apiManager.registerApi(this.apiBase, this); } } } setResourceVersion(namespace = "", newVersion: string) { this.resourceVersions.set(namespace, newVersion); } getResourceVersion(namespace = "") { return this.resourceVersions.get(namespace); } async refreshResourceVersion(params?: KubeApiListOptions) { return this.list(params, { limit: 1 }); } private computeApiBase(): string { return createKubeApiURL({ apiPrefix: this.apiPrefix, apiVersion: this.apiVersionWithGroup, resource: this.apiResource, }); } getUrl({ name, namespace }: Partial = {}, query?: Partial) { const resourcePath = createKubeApiURL({ apiPrefix: this.apiPrefix, apiVersion: this.apiVersionWithGroup, resource: this.apiResource, namespace: this.isNamespaced ? namespace ?? "default" : undefined, name, }); return resourcePath + (query ? `?${stringify(this.normalizeQuery(query))}` : ""); } protected normalizeQuery(query: Partial = {}) { if (query.labelSelector) { query.labelSelector = [query.labelSelector].flat().join(","); } if (query.fieldSelector) { query.fieldSelector = [query.fieldSelector].flat().join(","); } return query; } protected parseResponse(data: unknown, namespace?: string): T | T[] | null { if (!data) return null; const KubeObjectConstructor = this.objectConstructor; // process items list response, check before single item since there is overlap if (KubeObject.isJsonApiDataList(data, KubeObject.isPartialJsonApiData)) { const { apiVersion, items, metadata } = data; this.setResourceVersion(namespace, metadata.resourceVersion); this.setResourceVersion("", metadata.resourceVersion); return items.map((item) => { const object = new KubeObjectConstructor({ kind: this.kind, apiVersion, ...item, }); ensureObjectSelfLink(this, object); return object; }); } // process a single item if (KubeObject.isJsonApiData(data)) { const object = new KubeObjectConstructor(data); ensureObjectSelfLink(this, object); return object; } // custom apis might return array for list response, e.g. users, groups, etc. if (Array.isArray(data)) { return data.map(data => new KubeObjectConstructor(data)); } return null; } async list({ namespace = "", reqInit }: KubeApiListOptions = {}, query?: IKubeApiQueryParams): Promise { await this.checkPreferredVersion(); const url = this.getUrl({ namespace }); const res = await this.request.get(url, { query }, reqInit); const parsed = this.parseResponse(res, namespace); if (Array.isArray(parsed)) { return parsed; } if (!parsed) { return null; } throw new Error(`GET multiple request to ${url} returned not an array: ${JSON.stringify(parsed)}`); } async get(desc: ResourceDescriptor, query?: IKubeApiQueryParams): Promise { await this.checkPreferredVersion(); const url = this.getUrl(desc); const res = await this.request.get(url, { query }); const parsed = this.parseResponse(res); if (Array.isArray(parsed)) { throw new Error(`GET single request to ${url} returned an array: ${JSON.stringify(parsed)}`); } return parsed; } async create({ name, namespace }: Partial, data?: PartialKubeObject): Promise { await this.checkPreferredVersion(); const apiUrl = this.getUrl({ namespace }); const res = await this.request.post(apiUrl, { data: merge(data, { kind: this.kind, apiVersion: this.apiVersionWithGroup, metadata: { name, namespace, }, }), }); const parsed = this.parseResponse(res); if (Array.isArray(parsed)) { throw new Error(`POST request to ${apiUrl} returned an array: ${JSON.stringify(parsed)}`); } return parsed; } async update({ name, namespace }: ResourceDescriptor, data: PartialKubeObject): Promise { await this.checkPreferredVersion(); const apiUrl = this.getUrl({ namespace, name }); const res = await this.request.put(apiUrl, { data: merge(data, { metadata: { name, namespace, }, }), }); const parsed = this.parseResponse(res); if (Array.isArray(parsed)) { throw new Error(`PUT request to ${apiUrl} returned an array: ${JSON.stringify(parsed)}`); } return parsed; } async patch(desc: ResourceDescriptor, data?: PartialKubeObject | Patch, strategy: KubeApiPatchType = "strategic") { await this.checkPreferredVersion(); const apiUrl = this.getUrl(desc); const res = await this.request.patch(apiUrl, { data }, { headers: { "content-type": patchTypeHeaders[strategy], }, }); const parsed = this.parseResponse(res); if (Array.isArray(parsed)) { throw new Error(`PATCH request to ${apiUrl} returned an array: ${JSON.stringify(parsed)}`); } return parsed as T | null; } async delete({ propagationPolicy = "Background", ...desc }: DeleteResourceDescriptor) { await this.checkPreferredVersion(); const apiUrl = this.getUrl(desc); return this.request.del(apiUrl, { query: { propagationPolicy, }, }); } getWatchUrl(namespace = "", query: IKubeApiQueryParams = {}) { return this.getUrl({ namespace }, { watch: 1, resourceVersion: this.getResourceVersion(namespace), ...query, }); } watch(opts: KubeApiWatchOptions = { namespace: "", retry: false }): () => void { let errorReceived = false; let timedRetry: NodeJS.Timeout; const { namespace, callback = noop, retry, timeout } = opts; const { watchId = `${this.kind.toLowerCase()}-${this.watchId++}` } = opts; // Create AbortController for this request const abortController = new WrappedAbortController(opts.abortController); abortController.signal.addEventListener("abort", () => { logger.info(`[KUBE-API] watch (${watchId}) aborted ${watchUrl}`); clearTimeout(timedRetry); }); const requestParams = timeout ? { query: { timeoutSeconds: timeout }} : {}; const watchUrl = this.getWatchUrl(namespace); const responsePromise = this.request.getResponse(watchUrl, requestParams, { signal: abortController.signal, timeout: 600_000, }); logger.info(`[KUBE-API] watch (${watchId}) ${retry === true ? "retried" : "started"} ${watchUrl}`); responsePromise .then(response => { // True if the current watch request was retried let requestRetried = false; if (!response.ok) { logger.warn(`[KUBE-API] watch (${watchId}) error response ${watchUrl}`, { status: response.status }); return callback(null, response); } // Add mechanism to retry in case timeoutSeconds is set but the watch wasn't timed out. // This can happen if e.g. network is offline and AWS NLB is used. if (timeout) { setTimeout(() => { // We only retry if we haven't retried, haven't aborted and haven't received k8s error if (requestRetried || abortController.signal.aborted || errorReceived) { return; } // Close current request abortController.abort(); logger.info(`[KUBE-API] Watch timeout set, but not retried, retrying now`); requestRetried = true; // Clearing out any possible timeout, although we don't expect this to be set clearTimeout(timedRetry); this.watch({ ...opts, namespace, callback, watchId, retry: true }); // We wait longer than the timeout, as we expect the request to be retried with timeoutSeconds }, timeout * 1000 * 1.1); } ["end", "close", "error"].forEach((eventName) => { response.body.on(eventName, () => { // We only retry if we haven't retried, haven't aborted and haven't received k8s error // kubernetes errors (=errorReceived set) should be handled in a callback if (requestRetried || abortController.signal.aborted || errorReceived) { return; } logger.info(`[KUBE-API] watch (${watchId}) ${eventName} ${watchUrl}`); requestRetried = true; clearTimeout(timedRetry); timedRetry = setTimeout(() => { // we did not get any kubernetes errors so let's retry this.watch({ ...opts, namespace, callback, watchId, retry: true }); }, 1000); }); }); byline(response.body).on("data", (line) => { try { const event: IKubeWatchEvent = JSON.parse(line); if (event.type === "ERROR" && event.object.kind === "Status") { errorReceived = true; return callback(null, new KubeStatus(event.object as any)); } this.modifyWatchEvent(event); callback(event, null); } catch (ignore) { // ignore parse errors } }); }) .catch(error => { logger.error(`[KUBE-API] watch (${watchId}) throwed ${watchUrl}`, error); callback(null, error); }); return () => { abortController.abort(); }; } protected modifyWatchEvent(event: IKubeWatchEvent) { if (event.type === "ERROR") { return; } ensureObjectSelfLink(this, event.object); const { namespace, resourceVersion } = event.object.metadata; this.setResourceVersion(namespace, resourceVersion); this.setResourceVersion("", resourceVersion); } }