1
0
mirror of https://github.com/lensapp/lens.git synced 2025-05-20 05:10:56 +00:00

auto-reconnect on online/offline status change, interval connection check

Signed-off-by: Roman <ixrock@gmail.com>
This commit is contained in:
Roman 2021-01-25 14:52:22 +02:00
parent 632e77b263
commit 14a0c9cdca

View File

@ -40,15 +40,34 @@ export class KubeWatchApi {
private namespaceStore: NamespaceStore;
private requestId = 0;
private isConnected = false;
private reader: ReadableStreamReader<string>;
private subscribers = observable.map<KubeApi, number>();
private splitter = "\n";
private reconnectTimeoutMs = 5000;
private maxReconnectsOnError = 10;
// events
public onMessage = new EventEmitter<[IKubeWatchMessage]>();
@computed get isActive(): boolean {
return this.apis.length > 0;
}
@computed get apis(): string[] {
const { cluster, namespaceStore } = this;
const activeApis = Array.from(this.subscribers.keys());
return activeApis.map(api => {
if (!cluster.isAllowedResource(api.kind)) {
return [];
}
if (api.isNamespaced) {
return namespaceStore.getContextNamespaces().map(namespace => api.getWatchUrl(namespace));
} else {
return api.getWatchUrl();
}
}).flat();
}
constructor() {
this.init();
}
@ -67,27 +86,14 @@ export class KubeWatchApi {
private bindAutoConnect() {
const connect = debounce(() => this.connect(), 1000);
return reaction(() => this.activeApis, connect, {
reaction(() => this.apis, connect, {
fireImmediately: true,
equals: comparer.structural,
});
}
@computed get activeApis(): string[] {
const { cluster, namespaceStore } = this;
const activeApis = Array.from(this.subscribers.keys());
return activeApis.map(api => {
if (!cluster.isAllowedResource(api.kind)) {
return [];
}
if (api.isNamespaced) {
return namespaceStore.getContextNamespaces().map(namespace => api.getWatchUrl(namespace));
} else {
return api.getWatchUrl();
}
}).flat();
window.addEventListener("online", () => this.connect());
window.addEventListener("offline", () => this.disconnect());
setInterval(() => this.connectionCheck(), 30000);
}
getSubscribersCount(api: KubeApi) {
@ -149,10 +155,18 @@ export class KubeWatchApi {
return unsubscribe;
}
protected async connect(apis = this.activeApis) {
protected connectionCheck() {
if (this.isConnected) return;
return this.connect();
}
protected async connect(apis = this.apis) {
this.disconnect(); // close active connections first
if (!apis.length) {
if (!navigator.onLine || !apis.length) {
this.isConnected = false;
return;
}
@ -185,6 +199,7 @@ export class KubeWatchApi {
const stream = request.body.pipeThrough(new TextDecoderStream());
const reader = stream.getReader();
this.isConnected = true;
this.reader = reader;
while (true) {
@ -192,18 +207,21 @@ export class KubeWatchApi {
if (done) break; // exit
const events = (jsonBuffer + value).split(this.splitter);
const events = (jsonBuffer + value).split("\n");
jsonBuffer = this.processBuffer(events);
}
} catch (error) {
this.log({ message: error });
} finally {
this.isConnected = false;
}
}
protected disconnect() {
this.reader?.cancel();
this.reader = null;
this.isConnected = false;
}
// process received stream events, returns unprocessed buffer chunk if any
@ -272,14 +290,12 @@ export class KubeWatchApi {
this.connect();
} catch (error) {
this.log({
message: new Error("Failed to reconnect on stream end"),
meta: { error, event },
message: new Error(`Failed to connect on single stream end: ${error}`),
meta: { event, error },
});
if (this.subscribers.size > 0) {
setTimeout(() => {
this.onServerStreamEnd(event);
}, 1000);
if (this.isActive) {
setTimeout(() => this.onServerStreamEnd(event), 1000);
}
}
}