Using new /messages websocket endpoint if available

This commit is contained in:
Mateus Meyer Jiacomelli 2023-05-19 16:28:12 +00:00 committed by Raoul Snyman
parent ecd6b6f88f
commit b85900e8ec
4 changed files with 139 additions and 55 deletions

View File

@ -0,0 +1,36 @@
import { Type } from '@angular/core';
import { Observable } from 'rxjs';
export function createWebSocket<T>(
host: string,
wsPort: number,
deserialize: (input: string) => T,
endpoint = ''
): Observable<T> {
return new Observable((observer) => {
const ws = new WebSocket(`ws://${host}:${wsPort}/${endpoint}`);
ws.onmessage = (e) => {
const reader = new FileReader();
reader.onload = () => {
const data = deserialize(JSON.parse(reader.result as string));
observer.next(data);
};
reader.readAsText(e.data);
};
ws.onerror = () => {
observer.error();
};
ws.onclose = () => {
observer.complete();
};
return () => {
// Removing listeners to avoid loop
ws.onmessage = null;
ws.onclose = null;
ws.onerror = null;
ws.close();
};
});
}

View File

@ -12,21 +12,13 @@ import {
MainView,
SystemInformation,
Credentials,
AuthToken
AuthToken,
Message,
MessageType
} from './responses';
import { environment } from '../environments/environment';
const deserialize = (json, cls) => {
const inst = new cls();
for (const p in json) {
if (!json.hasOwnProperty(p)) {
continue;
}
inst[p] = json[p];
}
return inst;
};
import { createWebSocket } from './openlp-websocket';
import { deserialize } from './utils';
const httpOptions = {
headers: new HttpHeaders({
@ -48,12 +40,14 @@ export class OpenLPService {
public apiRevision: number | null;
private host: string;
public stateChanged$: EventEmitter<State>;
public messageReceived$: EventEmitter<Message<MessageType>>;
public webSocketStateChanged$: EventEmitter<WebSocketStatus>;
private isTwelveHourTime = true;
private webSocketTimeoutHandle: any = 0;
private ws: WebSocket = null;
private retrieveSystemInformationSubscription: Subscription;
private _stateWebSocketSubscription: Subscription;
private _messageWebSocketSubscription: Subscription;
private _retrieveSystemInformationSubscription: Subscription;
constructor(private http: HttpClient) {
const host = window.location.hostname;
@ -68,6 +62,7 @@ export class OpenLPService {
this.host = host;
this.stateChanged$ = new EventEmitter<State>();
this.webSocketStateChanged$ = new EventEmitter<WebSocketStatus>();
this.messageReceived$ = new EventEmitter<Message<MessageType>>();
this.createWebSocket();
}
@ -220,13 +215,10 @@ export class OpenLPService {
}
get webSocketStatus(): WebSocketStatus {
if (this.ws) {
switch (this.ws.readyState) {
case WebSocket.OPEN:
return WebSocketStatus.Open;
}
if (!this._stateWebSocketSubscription || this._stateWebSocketSubscription.closed) {
return WebSocketStatus.Closed;
}
return WebSocketStatus.Closed;
return WebSocketStatus.Open;
}
reconnectWebSocketIfNeeded() {
@ -237,38 +229,75 @@ export class OpenLPService {
createWebSocket() {
this.clearWebSocketTimeoutHandle();
if (this.retrieveSystemInformationSubscription) {
if (this._retrieveSystemInformationSubscription) {
// Cancels ongoing request to avoid connection flooding
this.retrieveSystemInformationSubscription.unsubscribe();
this._retrieveSystemInformationSubscription.unsubscribe();
}
this.retrieveSystemInformationSubscription = this.retrieveSystemInformation()
this._retrieveSystemInformationSubscription = this.retrieveSystemInformation()
.pipe(
shareReplay(1),
finalize(() => this.retrieveSystemInformationSubscription = null)
finalize(() => this._retrieveSystemInformationSubscription = null)
)
.subscribe(info => {
if (this.ws) {
// Removing listeners to avoid loop
this.ws.onmessage = null;
this.ws.onclose = null;
this.ws.onerror = null;
this.ws.close();
this.webSocketStateChanged$.emit(WebSocketStatus.Closed);
}
const ws = this.ws = new WebSocket(`ws://${this.host}:${info.websocket_port}`);
ws.onopen = () => {
this.webSocketStateChanged$.emit(WebSocketStatus.Open);
};
ws.onmessage = this.readWebSocketMessage;
ws.onerror = this.handleWebSocketError;
ws.onclose = () => {
this.webSocketStateChanged$.emit(WebSocketStatus.Closed);
this.handleWebSocketError();
};
}, _ => this.handleWebSocketError());
.subscribe({
next: info => {
this.createStateWebsocketConnection(info.websocket_port);
if (this.assertApiVersionMinimum(2, 4)) {
this.createMessageWebsocketConnection(info.websocket_port);
}
},
error: _ => this.reconnectWebSocket()
});
}
private handleWebSocketError = () => {
private createStateWebsocketConnection(websocketPort: number) {
if (this._stateWebSocketSubscription) {
this._stateWebSocketSubscription.unsubscribe();
}
let firstMessage = true;
this._stateWebSocketSubscription = createWebSocket(
this.host,
websocketPort,
(input: any) => deserialize(input.results, State)
).subscribe({
next: (state) => {
if (firstMessage) {
this.webSocketStateChanged$.emit(WebSocketStatus.Open);
firstMessage = false;
}
this.handleStateChange(state);
},
error: (e) => {
this.webSocketStateChanged$.emit(WebSocketStatus.Closed);
this.reconnectWebSocket();
},
complete: () => {
this.webSocketStateChanged$.emit(WebSocketStatus.Closed);
this.reconnectWebSocket();
}
});
}
private createMessageWebsocketConnection(websocketPort: number) {
if (this._messageWebSocketSubscription) {
this._messageWebSocketSubscription.unsubscribe();
}
this._stateWebSocketSubscription = createWebSocket(
this.host,
websocketPort,
(input: any) => deserialize(input, Message),
'messages',
).subscribe({
next: (message) => this.handleMessage(message),
error: (e) => this.reconnectWebSocket(),
complete: () => this.reconnectWebSocket()
});
}
private reconnectWebSocket = () => {
this.clearWebSocketTimeoutHandle();
this.webSocketTimeoutHandle = setTimeout(() => {
this.createWebSocket();
@ -281,17 +310,12 @@ export class OpenLPService {
}
}
private readWebSocketMessage = (event: MessageEvent<any>) => {
const reader = new FileReader();
reader.onload = () => {
const state = deserialize(JSON.parse(reader.result as string).results, State);
this.handleStateChange(state);
};
reader.readAsText(event.data);
};
handleStateChange(state: State) {
this.isTwelveHourTime = state.twelve;
this.stateChanged$.emit(state);
}
handleMessage(message: Message<MessageType>) {
this.messageReceived$.emit(message);
}
}

View File

@ -81,3 +81,15 @@ export interface Credentials {
export interface AuthToken {
token: string;
}
export class Message<T extends MessageType> {
plugin: T['plugin'];
key: T['key'];
value: T['value'];
}
export interface MessageType {
plugin: string;
key: string;
value: any;
}

12
src/app/utils.ts Normal file
View File

@ -0,0 +1,12 @@
import { Type } from '@angular/core';
export function deserialize<T>(json: any, cls: Type<T>): T {
const inst = new cls();
for (const p in json) {
if (!json.hasOwnProperty(p)) {
continue;
}
inst[p] = json[p];
}
return inst;
};