mirror of
https://github.com/lensapp/lens.git
synced 2025-05-20 05:10:56 +00:00
Retry watch if timeout set, but request was not retried (#4388)
* Retry watch if timeout set, but request was not retried. Signed-off-by: Panu Horsmalahti <phorsmalahti@mirantis.com> * Cancel current request if retried. Signed-off-by: Panu Horsmalahti <phorsmalahti@mirantis.com> * Clarify retried variable/comment Signed-off-by: Panu Horsmalahti <phorsmalahti@mirantis.com>
This commit is contained in:
parent
7d4a2a5735
commit
0ca8448d33
@ -23,6 +23,8 @@ import type { Request } from "node-fetch";
|
||||
import { forRemoteCluster, KubeApi } from "../kube-api";
|
||||
import { KubeJsonApi } from "../kube-json-api";
|
||||
import { KubeObject } from "../kube-object";
|
||||
import AbortController from "abort-controller";
|
||||
import { delay } from "../../utils/delay";
|
||||
|
||||
class TestKubeObject extends KubeObject {
|
||||
static kind = "Pod";
|
||||
@ -325,5 +327,126 @@ describe("KubeApi", () => {
|
||||
api.watch({ namespace: "kube-system", timeout: 60 });
|
||||
expect(spy).toHaveBeenCalledWith("/api/v1/namespaces/kube-system/pods?watch=1&resourceVersion=", { query: { timeoutSeconds: 60 }}, expect.anything());
|
||||
});
|
||||
|
||||
it("aborts watch using abortController", async (done) => {
|
||||
const spy = jest.spyOn(request, "getResponse");
|
||||
|
||||
(fetch as any).mockResponse(async (request: Request) => {
|
||||
(request as any).signal.addEventListener("abort", () => {
|
||||
done();
|
||||
});
|
||||
|
||||
return {};
|
||||
});
|
||||
|
||||
const abortController = new AbortController();
|
||||
|
||||
api.watch({
|
||||
namespace: "kube-system",
|
||||
timeout: 60,
|
||||
abortController,
|
||||
});
|
||||
|
||||
expect(spy).toHaveBeenCalledWith("/api/v1/namespaces/kube-system/pods?watch=1&resourceVersion=", { query: { timeoutSeconds: 60 }}, expect.anything());
|
||||
|
||||
await delay(100);
|
||||
|
||||
abortController.abort();
|
||||
});
|
||||
|
||||
describe("retries", () => {
|
||||
it("if request ended", (done) => {
|
||||
const spy = jest.spyOn(request, "getResponse");
|
||||
|
||||
// we need to mock using jest as jest-fetch-mock doesn't support mocking the body completely
|
||||
jest.spyOn(global, "fetch").mockImplementation(async () => {
|
||||
return {
|
||||
ok: true,
|
||||
body: {
|
||||
on: (eventName: string, callback: Function) => {
|
||||
// End the request in 100ms.
|
||||
if (eventName === "end") {
|
||||
setTimeout(() => {
|
||||
callback();
|
||||
}, 100);
|
||||
}
|
||||
},
|
||||
},
|
||||
} as any;
|
||||
});
|
||||
|
||||
api.watch({
|
||||
namespace: "kube-system",
|
||||
});
|
||||
|
||||
expect(spy).toHaveBeenCalledTimes(1);
|
||||
|
||||
setTimeout(() => {
|
||||
expect(spy).toHaveBeenCalledTimes(2);
|
||||
done();
|
||||
}, 2000);
|
||||
});
|
||||
|
||||
it("if request not closed after timeout", (done) => {
|
||||
const spy = jest.spyOn(request, "getResponse");
|
||||
|
||||
(fetch as any).mockResponse(async () => {
|
||||
return {};
|
||||
});
|
||||
|
||||
const timeoutSeconds = 1;
|
||||
|
||||
api.watch({
|
||||
namespace: "kube-system",
|
||||
timeout: timeoutSeconds,
|
||||
});
|
||||
|
||||
expect(spy).toHaveBeenCalledTimes(1);
|
||||
|
||||
setTimeout(() => {
|
||||
expect(spy).toHaveBeenCalledTimes(2);
|
||||
done();
|
||||
}, timeoutSeconds * 1000 * 1.2);
|
||||
});
|
||||
|
||||
it("retries only once if request ends and timeout is set", (done) => {
|
||||
const spy = jest.spyOn(request, "getResponse");
|
||||
|
||||
// we need to mock using jest as jest-fetch-mock doesn't support mocking the body completely
|
||||
jest.spyOn(global, "fetch").mockImplementation(async () => {
|
||||
return {
|
||||
ok: true,
|
||||
body: {
|
||||
on: (eventName: string, callback: Function) => {
|
||||
// End the request in 100ms
|
||||
if (eventName === "end") {
|
||||
setTimeout(() => {
|
||||
callback();
|
||||
}, 100);
|
||||
}
|
||||
},
|
||||
},
|
||||
} as any;
|
||||
});
|
||||
|
||||
const timeoutSeconds = 0.5;
|
||||
|
||||
api.watch({
|
||||
namespace: "kube-system",
|
||||
timeout: timeoutSeconds,
|
||||
});
|
||||
|
||||
expect(spy).toHaveBeenCalledTimes(1);
|
||||
|
||||
setTimeout(() => {
|
||||
expect(spy).toHaveBeenCalledTimes(2);
|
||||
done();
|
||||
}, 2000);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jest.clearAllMocks();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@ -208,6 +208,8 @@ export type KubeApiWatchOptions = {
|
||||
abortController?: AbortController
|
||||
watchId?: string;
|
||||
retry?: boolean;
|
||||
|
||||
// timeout in seconds
|
||||
timeout?: number;
|
||||
};
|
||||
|
||||
@ -528,35 +530,79 @@ export class KubeApi<T extends KubeObject> {
|
||||
watch(opts: KubeApiWatchOptions = { namespace: "", retry: false }): () => void {
|
||||
let errorReceived = false;
|
||||
let timedRetry: NodeJS.Timeout;
|
||||
const { abortController: { abort, signal } = new AbortController(), namespace, callback = noop, retry, timeout } = opts;
|
||||
const { namespace, callback = noop, retry, timeout } = opts;
|
||||
const { watchId = `${this.kind.toLowerCase()}-${this.watchId++}` } = opts;
|
||||
|
||||
signal.addEventListener("abort", () => {
|
||||
// Create AbortController for this request
|
||||
const abortController = new AbortController();
|
||||
|
||||
// If caller aborts, abort using request's abortController
|
||||
if (opts.abortController) {
|
||||
opts.abortController.signal.addEventListener("abort", () => {
|
||||
abortController.abort();
|
||||
});
|
||||
}
|
||||
|
||||
abortController.signal.addEventListener("abort", () => {
|
||||
logger.info(`[KUBE-API] watch (${watchId}) aborted ${watchUrl}`);
|
||||
clearTimeout(timedRetry);
|
||||
});
|
||||
|
||||
const requestParams = timeout ? { query: { timeoutSeconds: timeout }}: {};
|
||||
const watchUrl = this.getWatchUrl(namespace);
|
||||
const responsePromise = this.request.getResponse(watchUrl, requestParams, { signal, timeout: 600_000 });
|
||||
const responsePromise = this.request.getResponse(watchUrl, requestParams, {
|
||||
signal: abortController.signal,
|
||||
timeout: 600_000,
|
||||
});
|
||||
|
||||
logger.info(`[KUBE-API] watch (${watchId}) ${retry === true ? "retried" : "started"} ${watchUrl}`);
|
||||
|
||||
responsePromise
|
||||
.then(response => {
|
||||
// True if the current watch request was retried
|
||||
let requestRetried = false;
|
||||
|
||||
if (!response.ok) {
|
||||
logger.warn(`[KUBE-API] watch (${watchId}) error response ${watchUrl}`, { status: response.status });
|
||||
|
||||
return callback(null, response);
|
||||
}
|
||||
|
||||
// Add mechanism to retry in case timeoutSeconds is set but the watch wasn't timed out.
|
||||
// This can happen if e.g. network is offline and AWS NLB is used.
|
||||
if (timeout) {
|
||||
setTimeout(() => {
|
||||
// We only retry if we haven't retried, haven't aborted and haven't received k8s error
|
||||
if (requestRetried || abortController.signal.aborted || errorReceived) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Close current request
|
||||
abortController.abort();
|
||||
|
||||
logger.info(`[KUBE-API] Watch timeout set, but not retried, retrying now`);
|
||||
|
||||
requestRetried = true;
|
||||
|
||||
// Clearing out any possible timeout, although we don't expect this to be set
|
||||
clearTimeout(timedRetry);
|
||||
this.watch({ ...opts, namespace, callback, watchId, retry: true });
|
||||
// We wait longer than the timeout, as we expect the request to be retried with timeoutSeconds
|
||||
}, timeout * 1000 * 1.1);
|
||||
}
|
||||
|
||||
["end", "close", "error"].forEach((eventName) => {
|
||||
response.body.on(eventName, () => {
|
||||
if (errorReceived) return; // kubernetes errors should be handled in a callback
|
||||
if (signal.aborted) return;
|
||||
// We only retry if we haven't retried, haven't aborted and haven't received k8s error
|
||||
// kubernetes errors (=errorReceived set) should be handled in a callback
|
||||
if (requestRetried || abortController.signal.aborted || errorReceived) {
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info(`[KUBE-API] watch (${watchId}) ${eventName} ${watchUrl}`);
|
||||
|
||||
requestRetried = true;
|
||||
|
||||
clearTimeout(timedRetry);
|
||||
timedRetry = setTimeout(() => { // we did not get any kubernetes errors so let's retry
|
||||
this.watch({ ...opts, namespace, callback, watchId, retry: true });
|
||||
@ -587,7 +633,7 @@ export class KubeApi<T extends KubeObject> {
|
||||
callback(null, error);
|
||||
});
|
||||
|
||||
return abort;
|
||||
return abortController.abort;
|
||||
}
|
||||
|
||||
protected modifyWatchEvent(event: IKubeWatchEvent<KubeJsonApiData>) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user