mirror of
https://github.com/lensapp/lens.git
synced 2025-05-20 05:10:56 +00:00
Retry watch if timeout set, but request was not retried (#4388)
* Retry watch if timeout set, but request was not retried. Signed-off-by: Panu Horsmalahti <phorsmalahti@mirantis.com> * Cancel current request if retried. Signed-off-by: Panu Horsmalahti <phorsmalahti@mirantis.com> * Clarify retried variable/comment Signed-off-by: Panu Horsmalahti <phorsmalahti@mirantis.com>
This commit is contained in:
parent
7d4a2a5735
commit
0ca8448d33
@ -23,6 +23,8 @@ import type { Request } from "node-fetch";
|
|||||||
import { forRemoteCluster, KubeApi } from "../kube-api";
|
import { forRemoteCluster, KubeApi } from "../kube-api";
|
||||||
import { KubeJsonApi } from "../kube-json-api";
|
import { KubeJsonApi } from "../kube-json-api";
|
||||||
import { KubeObject } from "../kube-object";
|
import { KubeObject } from "../kube-object";
|
||||||
|
import AbortController from "abort-controller";
|
||||||
|
import { delay } from "../../utils/delay";
|
||||||
|
|
||||||
class TestKubeObject extends KubeObject {
|
class TestKubeObject extends KubeObject {
|
||||||
static kind = "Pod";
|
static kind = "Pod";
|
||||||
@ -325,5 +327,126 @@ describe("KubeApi", () => {
|
|||||||
api.watch({ namespace: "kube-system", timeout: 60 });
|
api.watch({ namespace: "kube-system", timeout: 60 });
|
||||||
expect(spy).toHaveBeenCalledWith("/api/v1/namespaces/kube-system/pods?watch=1&resourceVersion=", { query: { timeoutSeconds: 60 }}, expect.anything());
|
expect(spy).toHaveBeenCalledWith("/api/v1/namespaces/kube-system/pods?watch=1&resourceVersion=", { query: { timeoutSeconds: 60 }}, expect.anything());
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("aborts watch using abortController", async (done) => {
|
||||||
|
const spy = jest.spyOn(request, "getResponse");
|
||||||
|
|
||||||
|
(fetch as any).mockResponse(async (request: Request) => {
|
||||||
|
(request as any).signal.addEventListener("abort", () => {
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
|
||||||
|
return {};
|
||||||
|
});
|
||||||
|
|
||||||
|
const abortController = new AbortController();
|
||||||
|
|
||||||
|
api.watch({
|
||||||
|
namespace: "kube-system",
|
||||||
|
timeout: 60,
|
||||||
|
abortController,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(spy).toHaveBeenCalledWith("/api/v1/namespaces/kube-system/pods?watch=1&resourceVersion=", { query: { timeoutSeconds: 60 }}, expect.anything());
|
||||||
|
|
||||||
|
await delay(100);
|
||||||
|
|
||||||
|
abortController.abort();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("retries", () => {
|
||||||
|
it("if request ended", (done) => {
|
||||||
|
const spy = jest.spyOn(request, "getResponse");
|
||||||
|
|
||||||
|
// we need to mock using jest as jest-fetch-mock doesn't support mocking the body completely
|
||||||
|
jest.spyOn(global, "fetch").mockImplementation(async () => {
|
||||||
|
return {
|
||||||
|
ok: true,
|
||||||
|
body: {
|
||||||
|
on: (eventName: string, callback: Function) => {
|
||||||
|
// End the request in 100ms.
|
||||||
|
if (eventName === "end") {
|
||||||
|
setTimeout(() => {
|
||||||
|
callback();
|
||||||
|
}, 100);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
} as any;
|
||||||
|
});
|
||||||
|
|
||||||
|
api.watch({
|
||||||
|
namespace: "kube-system",
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(spy).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
expect(spy).toHaveBeenCalledTimes(2);
|
||||||
|
done();
|
||||||
|
}, 2000);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("if request not closed after timeout", (done) => {
|
||||||
|
const spy = jest.spyOn(request, "getResponse");
|
||||||
|
|
||||||
|
(fetch as any).mockResponse(async () => {
|
||||||
|
return {};
|
||||||
|
});
|
||||||
|
|
||||||
|
const timeoutSeconds = 1;
|
||||||
|
|
||||||
|
api.watch({
|
||||||
|
namespace: "kube-system",
|
||||||
|
timeout: timeoutSeconds,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(spy).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
expect(spy).toHaveBeenCalledTimes(2);
|
||||||
|
done();
|
||||||
|
}, timeoutSeconds * 1000 * 1.2);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("retries only once if request ends and timeout is set", (done) => {
|
||||||
|
const spy = jest.spyOn(request, "getResponse");
|
||||||
|
|
||||||
|
// we need to mock using jest as jest-fetch-mock doesn't support mocking the body completely
|
||||||
|
jest.spyOn(global, "fetch").mockImplementation(async () => {
|
||||||
|
return {
|
||||||
|
ok: true,
|
||||||
|
body: {
|
||||||
|
on: (eventName: string, callback: Function) => {
|
||||||
|
// End the request in 100ms
|
||||||
|
if (eventName === "end") {
|
||||||
|
setTimeout(() => {
|
||||||
|
callback();
|
||||||
|
}, 100);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
} as any;
|
||||||
|
});
|
||||||
|
|
||||||
|
const timeoutSeconds = 0.5;
|
||||||
|
|
||||||
|
api.watch({
|
||||||
|
namespace: "kube-system",
|
||||||
|
timeout: timeoutSeconds,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(spy).toHaveBeenCalledTimes(1);
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
expect(spy).toHaveBeenCalledTimes(2);
|
||||||
|
done();
|
||||||
|
}, 2000);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
jest.clearAllMocks();
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@ -208,6 +208,8 @@ export type KubeApiWatchOptions = {
|
|||||||
abortController?: AbortController
|
abortController?: AbortController
|
||||||
watchId?: string;
|
watchId?: string;
|
||||||
retry?: boolean;
|
retry?: boolean;
|
||||||
|
|
||||||
|
// timeout in seconds
|
||||||
timeout?: number;
|
timeout?: number;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -528,35 +530,79 @@ export class KubeApi<T extends KubeObject> {
|
|||||||
watch(opts: KubeApiWatchOptions = { namespace: "", retry: false }): () => void {
|
watch(opts: KubeApiWatchOptions = { namespace: "", retry: false }): () => void {
|
||||||
let errorReceived = false;
|
let errorReceived = false;
|
||||||
let timedRetry: NodeJS.Timeout;
|
let timedRetry: NodeJS.Timeout;
|
||||||
const { abortController: { abort, signal } = new AbortController(), namespace, callback = noop, retry, timeout } = opts;
|
const { namespace, callback = noop, retry, timeout } = opts;
|
||||||
const { watchId = `${this.kind.toLowerCase()}-${this.watchId++}` } = opts;
|
const { watchId = `${this.kind.toLowerCase()}-${this.watchId++}` } = opts;
|
||||||
|
|
||||||
signal.addEventListener("abort", () => {
|
// Create AbortController for this request
|
||||||
|
const abortController = new AbortController();
|
||||||
|
|
||||||
|
// If caller aborts, abort using request's abortController
|
||||||
|
if (opts.abortController) {
|
||||||
|
opts.abortController.signal.addEventListener("abort", () => {
|
||||||
|
abortController.abort();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
abortController.signal.addEventListener("abort", () => {
|
||||||
logger.info(`[KUBE-API] watch (${watchId}) aborted ${watchUrl}`);
|
logger.info(`[KUBE-API] watch (${watchId}) aborted ${watchUrl}`);
|
||||||
clearTimeout(timedRetry);
|
clearTimeout(timedRetry);
|
||||||
});
|
});
|
||||||
|
|
||||||
const requestParams = timeout ? { query: { timeoutSeconds: timeout }}: {};
|
const requestParams = timeout ? { query: { timeoutSeconds: timeout }}: {};
|
||||||
const watchUrl = this.getWatchUrl(namespace);
|
const watchUrl = this.getWatchUrl(namespace);
|
||||||
const responsePromise = this.request.getResponse(watchUrl, requestParams, { signal, timeout: 600_000 });
|
const responsePromise = this.request.getResponse(watchUrl, requestParams, {
|
||||||
|
signal: abortController.signal,
|
||||||
|
timeout: 600_000,
|
||||||
|
});
|
||||||
|
|
||||||
logger.info(`[KUBE-API] watch (${watchId}) ${retry === true ? "retried" : "started"} ${watchUrl}`);
|
logger.info(`[KUBE-API] watch (${watchId}) ${retry === true ? "retried" : "started"} ${watchUrl}`);
|
||||||
|
|
||||||
responsePromise
|
responsePromise
|
||||||
.then(response => {
|
.then(response => {
|
||||||
|
// True if the current watch request was retried
|
||||||
|
let requestRetried = false;
|
||||||
|
|
||||||
if (!response.ok) {
|
if (!response.ok) {
|
||||||
logger.warn(`[KUBE-API] watch (${watchId}) error response ${watchUrl}`, { status: response.status });
|
logger.warn(`[KUBE-API] watch (${watchId}) error response ${watchUrl}`, { status: response.status });
|
||||||
|
|
||||||
return callback(null, response);
|
return callback(null, response);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add mechanism to retry in case timeoutSeconds is set but the watch wasn't timed out.
|
||||||
|
// This can happen if e.g. network is offline and AWS NLB is used.
|
||||||
|
if (timeout) {
|
||||||
|
setTimeout(() => {
|
||||||
|
// We only retry if we haven't retried, haven't aborted and haven't received k8s error
|
||||||
|
if (requestRetried || abortController.signal.aborted || errorReceived) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close current request
|
||||||
|
abortController.abort();
|
||||||
|
|
||||||
|
logger.info(`[KUBE-API] Watch timeout set, but not retried, retrying now`);
|
||||||
|
|
||||||
|
requestRetried = true;
|
||||||
|
|
||||||
|
// Clearing out any possible timeout, although we don't expect this to be set
|
||||||
|
clearTimeout(timedRetry);
|
||||||
|
this.watch({ ...opts, namespace, callback, watchId, retry: true });
|
||||||
|
// We wait longer than the timeout, as we expect the request to be retried with timeoutSeconds
|
||||||
|
}, timeout * 1000 * 1.1);
|
||||||
|
}
|
||||||
|
|
||||||
["end", "close", "error"].forEach((eventName) => {
|
["end", "close", "error"].forEach((eventName) => {
|
||||||
response.body.on(eventName, () => {
|
response.body.on(eventName, () => {
|
||||||
if (errorReceived) return; // kubernetes errors should be handled in a callback
|
// We only retry if we haven't retried, haven't aborted and haven't received k8s error
|
||||||
if (signal.aborted) return;
|
// kubernetes errors (=errorReceived set) should be handled in a callback
|
||||||
|
if (requestRetried || abortController.signal.aborted || errorReceived) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
logger.info(`[KUBE-API] watch (${watchId}) ${eventName} ${watchUrl}`);
|
logger.info(`[KUBE-API] watch (${watchId}) ${eventName} ${watchUrl}`);
|
||||||
|
|
||||||
|
requestRetried = true;
|
||||||
|
|
||||||
clearTimeout(timedRetry);
|
clearTimeout(timedRetry);
|
||||||
timedRetry = setTimeout(() => { // we did not get any kubernetes errors so let's retry
|
timedRetry = setTimeout(() => { // we did not get any kubernetes errors so let's retry
|
||||||
this.watch({ ...opts, namespace, callback, watchId, retry: true });
|
this.watch({ ...opts, namespace, callback, watchId, retry: true });
|
||||||
@ -587,7 +633,7 @@ export class KubeApi<T extends KubeObject> {
|
|||||||
callback(null, error);
|
callback(null, error);
|
||||||
});
|
});
|
||||||
|
|
||||||
return abort;
|
return abortController.abort;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected modifyWatchEvent(event: IKubeWatchEvent<KubeJsonApiData>) {
|
protected modifyWatchEvent(event: IKubeWatchEvent<KubeJsonApiData>) {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user