diff --git a/src/common/k8s-api/__tests__/kube-api.test.ts b/src/common/k8s-api/__tests__/kube-api.test.ts index 08cc9f2642..7fffcbd1f6 100644 --- a/src/common/k8s-api/__tests__/kube-api.test.ts +++ b/src/common/k8s-api/__tests__/kube-api.test.ts @@ -23,6 +23,8 @@ import type { Request } from "node-fetch"; import { forRemoteCluster, KubeApi } from "../kube-api"; import { KubeJsonApi } from "../kube-json-api"; import { KubeObject } from "../kube-object"; +import AbortController from "abort-controller"; +import { delay } from "../../utils/delay"; class TestKubeObject extends KubeObject { static kind = "Pod"; @@ -325,5 +327,126 @@ describe("KubeApi", () => { api.watch({ namespace: "kube-system", timeout: 60 }); expect(spy).toHaveBeenCalledWith("/api/v1/namespaces/kube-system/pods?watch=1&resourceVersion=", { query: { timeoutSeconds: 60 }}, expect.anything()); }); + + it("aborts watch using abortController", async (done) => { + const spy = jest.spyOn(request, "getResponse"); + + (fetch as any).mockResponse(async (request: Request) => { + (request as any).signal.addEventListener("abort", () => { + done(); + }); + + return {}; + }); + + const abortController = new AbortController(); + + api.watch({ + namespace: "kube-system", + timeout: 60, + abortController, + }); + + expect(spy).toHaveBeenCalledWith("/api/v1/namespaces/kube-system/pods?watch=1&resourceVersion=", { query: { timeoutSeconds: 60 }}, expect.anything()); + + await delay(100); + + abortController.abort(); + }); + + describe("retries", () => { + it("if request ended", (done) => { + const spy = jest.spyOn(request, "getResponse"); + + // we need to mock using jest as jest-fetch-mock doesn't support mocking the body completely + jest.spyOn(global, "fetch").mockImplementation(async () => { + return { + ok: true, + body: { + on: (eventName: string, callback: Function) => { + // End the request in 100ms. + if (eventName === "end") { + setTimeout(() => { + callback(); + }, 100); + } + }, + }, + } as any; + }); + + api.watch({ + namespace: "kube-system", + }); + + expect(spy).toHaveBeenCalledTimes(1); + + setTimeout(() => { + expect(spy).toHaveBeenCalledTimes(2); + done(); + }, 2000); + }); + + it("if request not closed after timeout", (done) => { + const spy = jest.spyOn(request, "getResponse"); + + (fetch as any).mockResponse(async () => { + return {}; + }); + + const timeoutSeconds = 1; + + api.watch({ + namespace: "kube-system", + timeout: timeoutSeconds, + }); + + expect(spy).toHaveBeenCalledTimes(1); + + setTimeout(() => { + expect(spy).toHaveBeenCalledTimes(2); + done(); + }, timeoutSeconds * 1000 * 1.2); + }); + + it("retries only once if request ends and timeout is set", (done) => { + const spy = jest.spyOn(request, "getResponse"); + + // we need to mock using jest as jest-fetch-mock doesn't support mocking the body completely + jest.spyOn(global, "fetch").mockImplementation(async () => { + return { + ok: true, + body: { + on: (eventName: string, callback: Function) => { + // End the request in 100ms + if (eventName === "end") { + setTimeout(() => { + callback(); + }, 100); + } + }, + }, + } as any; + }); + + const timeoutSeconds = 0.5; + + api.watch({ + namespace: "kube-system", + timeout: timeoutSeconds, + }); + + expect(spy).toHaveBeenCalledTimes(1); + + setTimeout(() => { + expect(spy).toHaveBeenCalledTimes(2); + done(); + }, 2000); + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + }); }); }); diff --git a/src/common/k8s-api/kube-api.ts b/src/common/k8s-api/kube-api.ts index e4ec4a95e9..0684bc184b 100644 --- a/src/common/k8s-api/kube-api.ts +++ b/src/common/k8s-api/kube-api.ts @@ -208,6 +208,8 @@ export type KubeApiWatchOptions = { abortController?: AbortController watchId?: string; retry?: boolean; + + // timeout in seconds timeout?: number; }; @@ -528,35 +530,79 @@ export class KubeApi { watch(opts: KubeApiWatchOptions = { namespace: "", retry: false }): () => void { let errorReceived = false; let timedRetry: NodeJS.Timeout; - const { abortController: { abort, signal } = new AbortController(), namespace, callback = noop, retry, timeout } = opts; + const { namespace, callback = noop, retry, timeout } = opts; const { watchId = `${this.kind.toLowerCase()}-${this.watchId++}` } = opts; - signal.addEventListener("abort", () => { + // Create AbortController for this request + const abortController = new AbortController(); + + // If caller aborts, abort using request's abortController + if (opts.abortController) { + opts.abortController.signal.addEventListener("abort", () => { + abortController.abort(); + }); + } + + abortController.signal.addEventListener("abort", () => { logger.info(`[KUBE-API] watch (${watchId}) aborted ${watchUrl}`); clearTimeout(timedRetry); }); const requestParams = timeout ? { query: { timeoutSeconds: timeout }}: {}; const watchUrl = this.getWatchUrl(namespace); - const responsePromise = this.request.getResponse(watchUrl, requestParams, { signal, timeout: 600_000 }); + const responsePromise = this.request.getResponse(watchUrl, requestParams, { + signal: abortController.signal, + timeout: 600_000, + }); logger.info(`[KUBE-API] watch (${watchId}) ${retry === true ? "retried" : "started"} ${watchUrl}`); responsePromise .then(response => { + // True if the current watch request was retried + let requestRetried = false; + if (!response.ok) { logger.warn(`[KUBE-API] watch (${watchId}) error response ${watchUrl}`, { status: response.status }); return callback(null, response); } + // Add mechanism to retry in case timeoutSeconds is set but the watch wasn't timed out. + // This can happen if e.g. network is offline and AWS NLB is used. + if (timeout) { + setTimeout(() => { + // We only retry if we haven't retried, haven't aborted and haven't received k8s error + if (requestRetried || abortController.signal.aborted || errorReceived) { + return; + } + + // Close current request + abortController.abort(); + + logger.info(`[KUBE-API] Watch timeout set, but not retried, retrying now`); + + requestRetried = true; + + // Clearing out any possible timeout, although we don't expect this to be set + clearTimeout(timedRetry); + this.watch({ ...opts, namespace, callback, watchId, retry: true }); + // We wait longer than the timeout, as we expect the request to be retried with timeoutSeconds + }, timeout * 1000 * 1.1); + } + ["end", "close", "error"].forEach((eventName) => { response.body.on(eventName, () => { - if (errorReceived) return; // kubernetes errors should be handled in a callback - if (signal.aborted) return; + // We only retry if we haven't retried, haven't aborted and haven't received k8s error + // kubernetes errors (=errorReceived set) should be handled in a callback + if (requestRetried || abortController.signal.aborted || errorReceived) { + return; + } logger.info(`[KUBE-API] watch (${watchId}) ${eventName} ${watchUrl}`); + requestRetried = true; + clearTimeout(timedRetry); timedRetry = setTimeout(() => { // we did not get any kubernetes errors so let's retry this.watch({ ...opts, namespace, callback, watchId, retry: true }); @@ -587,7 +633,7 @@ export class KubeApi { callback(null, error); }); - return abort; + return abortController.abort; } protected modifyWatchEvent(event: IKubeWatchEvent) {