1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

Retry watch if timeout set, but request was not retried (#4388)

* Retry watch if timeout set, but request was not retried.

Signed-off-by: Panu Horsmalahti <phorsmalahti@mirantis.com>

* Cancel current request if retried.

Signed-off-by: Panu Horsmalahti <phorsmalahti@mirantis.com>

* Clarify retried variable/comment

Signed-off-by: Panu Horsmalahti <phorsmalahti@mirantis.com>
This commit is contained in:
Panu Horsmalahti 2021-11-24 15:56:01 +02:00 committed by GitHub
parent 7d4a2a5735
commit 0ca8448d33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 175 additions and 6 deletions

View File

@ -23,6 +23,8 @@ import type { Request } from "node-fetch";
import { forRemoteCluster, KubeApi } from "../kube-api";
import { KubeJsonApi } from "../kube-json-api";
import { KubeObject } from "../kube-object";
import AbortController from "abort-controller";
import { delay } from "../../utils/delay";
class TestKubeObject extends KubeObject {
static kind = "Pod";
@ -325,5 +327,126 @@ describe("KubeApi", () => {
api.watch({ namespace: "kube-system", timeout: 60 });
expect(spy).toHaveBeenCalledWith("/api/v1/namespaces/kube-system/pods?watch=1&resourceVersion=", { query: { timeoutSeconds: 60 }}, expect.anything());
});
it("aborts watch using abortController", async (done) => {
const spy = jest.spyOn(request, "getResponse");
(fetch as any).mockResponse(async (request: Request) => {
(request as any).signal.addEventListener("abort", () => {
done();
});
return {};
});
const abortController = new AbortController();
api.watch({
namespace: "kube-system",
timeout: 60,
abortController,
});
expect(spy).toHaveBeenCalledWith("/api/v1/namespaces/kube-system/pods?watch=1&resourceVersion=", { query: { timeoutSeconds: 60 }}, expect.anything());
await delay(100);
abortController.abort();
});
describe("retries", () => {
it("if request ended", (done) => {
const spy = jest.spyOn(request, "getResponse");
// we need to mock using jest as jest-fetch-mock doesn't support mocking the body completely
jest.spyOn(global, "fetch").mockImplementation(async () => {
return {
ok: true,
body: {
on: (eventName: string, callback: Function) => {
// End the request in 100ms.
if (eventName === "end") {
setTimeout(() => {
callback();
}, 100);
}
},
},
} as any;
});
api.watch({
namespace: "kube-system",
});
expect(spy).toHaveBeenCalledTimes(1);
setTimeout(() => {
expect(spy).toHaveBeenCalledTimes(2);
done();
}, 2000);
});
it("if request not closed after timeout", (done) => {
const spy = jest.spyOn(request, "getResponse");
(fetch as any).mockResponse(async () => {
return {};
});
const timeoutSeconds = 1;
api.watch({
namespace: "kube-system",
timeout: timeoutSeconds,
});
expect(spy).toHaveBeenCalledTimes(1);
setTimeout(() => {
expect(spy).toHaveBeenCalledTimes(2);
done();
}, timeoutSeconds * 1000 * 1.2);
});
it("retries only once if request ends and timeout is set", (done) => {
const spy = jest.spyOn(request, "getResponse");
// we need to mock using jest as jest-fetch-mock doesn't support mocking the body completely
jest.spyOn(global, "fetch").mockImplementation(async () => {
return {
ok: true,
body: {
on: (eventName: string, callback: Function) => {
// End the request in 100ms
if (eventName === "end") {
setTimeout(() => {
callback();
}, 100);
}
},
},
} as any;
});
const timeoutSeconds = 0.5;
api.watch({
namespace: "kube-system",
timeout: timeoutSeconds,
});
expect(spy).toHaveBeenCalledTimes(1);
setTimeout(() => {
expect(spy).toHaveBeenCalledTimes(2);
done();
}, 2000);
});
afterEach(() => {
jest.clearAllMocks();
});
});
});
});

View File

@ -208,6 +208,8 @@ export type KubeApiWatchOptions = {
abortController?: AbortController
watchId?: string;
retry?: boolean;
// timeout in seconds
timeout?: number;
};
@ -528,35 +530,79 @@ export class KubeApi<T extends KubeObject> {
watch(opts: KubeApiWatchOptions = { namespace: "", retry: false }): () => void {
let errorReceived = false;
let timedRetry: NodeJS.Timeout;
const { abortController: { abort, signal } = new AbortController(), namespace, callback = noop, retry, timeout } = opts;
const { namespace, callback = noop, retry, timeout } = opts;
const { watchId = `${this.kind.toLowerCase()}-${this.watchId++}` } = opts;
signal.addEventListener("abort", () => {
// Create AbortController for this request
const abortController = new AbortController();
// If caller aborts, abort using request's abortController
if (opts.abortController) {
opts.abortController.signal.addEventListener("abort", () => {
abortController.abort();
});
}
abortController.signal.addEventListener("abort", () => {
logger.info(`[KUBE-API] watch (${watchId}) aborted ${watchUrl}`);
clearTimeout(timedRetry);
});
const requestParams = timeout ? { query: { timeoutSeconds: timeout }}: {};
const watchUrl = this.getWatchUrl(namespace);
const responsePromise = this.request.getResponse(watchUrl, requestParams, { signal, timeout: 600_000 });
const responsePromise = this.request.getResponse(watchUrl, requestParams, {
signal: abortController.signal,
timeout: 600_000,
});
logger.info(`[KUBE-API] watch (${watchId}) ${retry === true ? "retried" : "started"} ${watchUrl}`);
responsePromise
.then(response => {
// True if the current watch request was retried
let requestRetried = false;
if (!response.ok) {
logger.warn(`[KUBE-API] watch (${watchId}) error response ${watchUrl}`, { status: response.status });
return callback(null, response);
}
// Add mechanism to retry in case timeoutSeconds is set but the watch wasn't timed out.
// This can happen if e.g. network is offline and AWS NLB is used.
if (timeout) {
setTimeout(() => {
// We only retry if we haven't retried, haven't aborted and haven't received k8s error
if (requestRetried || abortController.signal.aborted || errorReceived) {
return;
}
// Close current request
abortController.abort();
logger.info(`[KUBE-API] Watch timeout set, but not retried, retrying now`);
requestRetried = true;
// Clearing out any possible timeout, although we don't expect this to be set
clearTimeout(timedRetry);
this.watch({ ...opts, namespace, callback, watchId, retry: true });
// We wait longer than the timeout, as we expect the request to be retried with timeoutSeconds
}, timeout * 1000 * 1.1);
}
["end", "close", "error"].forEach((eventName) => {
response.body.on(eventName, () => {
if (errorReceived) return; // kubernetes errors should be handled in a callback
if (signal.aborted) return;
// We only retry if we haven't retried, haven't aborted and haven't received k8s error
// kubernetes errors (=errorReceived set) should be handled in a callback
if (requestRetried || abortController.signal.aborted || errorReceived) {
return;
}
logger.info(`[KUBE-API] watch (${watchId}) ${eventName} ${watchUrl}`);
requestRetried = true;
clearTimeout(timedRetry);
timedRetry = setTimeout(() => { // we did not get any kubernetes errors so let's retry
this.watch({ ...opts, namespace, callback, watchId, retry: true });
@ -587,7 +633,7 @@ export class KubeApi<T extends KubeObject> {
callback(null, error);
});
return abort;
return abortController.abort;
}
protected modifyWatchEvent(event: IKubeWatchEvent<KubeJsonApiData>) {