From b85900e8ecf694db31bd69a0124a76e378605edf Mon Sep 17 00:00:00 2001 From: Mateus Meyer Jiacomelli Date: Fri, 19 May 2023 16:28:12 +0000 Subject: [PATCH] Using new /messages websocket endpoint if available --- src/app/openlp-websocket.ts | 36 ++++++++++ src/app/openlp.service.ts | 134 +++++++++++++++++++++--------------- src/app/responses.ts | 12 ++++ src/app/utils.ts | 12 ++++ 4 files changed, 139 insertions(+), 55 deletions(-) create mode 100644 src/app/openlp-websocket.ts create mode 100644 src/app/utils.ts diff --git a/src/app/openlp-websocket.ts b/src/app/openlp-websocket.ts new file mode 100644 index 0000000..a3dbb09 --- /dev/null +++ b/src/app/openlp-websocket.ts @@ -0,0 +1,36 @@ + +import { Type } from '@angular/core'; +import { Observable } from 'rxjs'; + +export function createWebSocket( + host: string, + wsPort: number, + deserialize: (input: string) => T, + endpoint = '' +): Observable { + return new Observable((observer) => { + const ws = new WebSocket(`ws://${host}:${wsPort}/${endpoint}`); + ws.onmessage = (e) => { + const reader = new FileReader(); + reader.onload = () => { + const data = deserialize(JSON.parse(reader.result as string)); + observer.next(data); + }; + reader.readAsText(e.data); + }; + ws.onerror = () => { + observer.error(); + }; + ws.onclose = () => { + observer.complete(); + }; + + return () => { + // Removing listeners to avoid loop + ws.onmessage = null; + ws.onclose = null; + ws.onerror = null; + ws.close(); + }; + }); +} diff --git a/src/app/openlp.service.ts b/src/app/openlp.service.ts index 0cafd06..9af3c05 100644 --- a/src/app/openlp.service.ts +++ b/src/app/openlp.service.ts @@ -12,21 +12,13 @@ import { MainView, SystemInformation, Credentials, - AuthToken + AuthToken, + Message, + MessageType } from './responses'; import { environment } from '../environments/environment'; - - -const deserialize = (json, cls) => { - const inst = new cls(); - for (const p in json) { - if (!json.hasOwnProperty(p)) { - continue; - } - inst[p] = json[p]; - } - return inst; -}; +import { createWebSocket } from './openlp-websocket'; +import { deserialize } from './utils'; const httpOptions = { headers: new HttpHeaders({ @@ -48,12 +40,14 @@ export class OpenLPService { public apiRevision: number | null; private host: string; public stateChanged$: EventEmitter; + public messageReceived$: EventEmitter>; public webSocketStateChanged$: EventEmitter; private isTwelveHourTime = true; private webSocketTimeoutHandle: any = 0; - private ws: WebSocket = null; - private retrieveSystemInformationSubscription: Subscription; + private _stateWebSocketSubscription: Subscription; + private _messageWebSocketSubscription: Subscription; + private _retrieveSystemInformationSubscription: Subscription; constructor(private http: HttpClient) { const host = window.location.hostname; @@ -68,6 +62,7 @@ export class OpenLPService { this.host = host; this.stateChanged$ = new EventEmitter(); this.webSocketStateChanged$ = new EventEmitter(); + this.messageReceived$ = new EventEmitter>(); this.createWebSocket(); } @@ -220,13 +215,10 @@ export class OpenLPService { } get webSocketStatus(): WebSocketStatus { - if (this.ws) { - switch (this.ws.readyState) { - case WebSocket.OPEN: - return WebSocketStatus.Open; - } + if (!this._stateWebSocketSubscription || this._stateWebSocketSubscription.closed) { + return WebSocketStatus.Closed; } - return WebSocketStatus.Closed; + return WebSocketStatus.Open; } reconnectWebSocketIfNeeded() { @@ -237,38 +229,75 @@ export class OpenLPService { createWebSocket() { this.clearWebSocketTimeoutHandle(); - if (this.retrieveSystemInformationSubscription) { + if (this._retrieveSystemInformationSubscription) { // Cancels ongoing request to avoid connection flooding - this.retrieveSystemInformationSubscription.unsubscribe(); + this._retrieveSystemInformationSubscription.unsubscribe(); } - this.retrieveSystemInformationSubscription = this.retrieveSystemInformation() + this._retrieveSystemInformationSubscription = this.retrieveSystemInformation() .pipe( shareReplay(1), - finalize(() => this.retrieveSystemInformationSubscription = null) + finalize(() => this._retrieveSystemInformationSubscription = null) ) - .subscribe(info => { - if (this.ws) { - // Removing listeners to avoid loop - this.ws.onmessage = null; - this.ws.onclose = null; - this.ws.onerror = null; - this.ws.close(); - this.webSocketStateChanged$.emit(WebSocketStatus.Closed); - } - const ws = this.ws = new WebSocket(`ws://${this.host}:${info.websocket_port}`); - ws.onopen = () => { - this.webSocketStateChanged$.emit(WebSocketStatus.Open); - }; - ws.onmessage = this.readWebSocketMessage; - ws.onerror = this.handleWebSocketError; - ws.onclose = () => { - this.webSocketStateChanged$.emit(WebSocketStatus.Closed); - this.handleWebSocketError(); - }; - }, _ => this.handleWebSocketError()); + .subscribe({ + next: info => { + this.createStateWebsocketConnection(info.websocket_port); + + if (this.assertApiVersionMinimum(2, 4)) { + this.createMessageWebsocketConnection(info.websocket_port); + } + }, + error: _ => this.reconnectWebSocket() + }); } - private handleWebSocketError = () => { + private createStateWebsocketConnection(websocketPort: number) { + if (this._stateWebSocketSubscription) { + this._stateWebSocketSubscription.unsubscribe(); + } + + let firstMessage = true; + + this._stateWebSocketSubscription = createWebSocket( + this.host, + websocketPort, + (input: any) => deserialize(input.results, State) + ).subscribe({ + next: (state) => { + if (firstMessage) { + this.webSocketStateChanged$.emit(WebSocketStatus.Open); + firstMessage = false; + } + this.handleStateChange(state); + }, + error: (e) => { + this.webSocketStateChanged$.emit(WebSocketStatus.Closed); + this.reconnectWebSocket(); + }, + complete: () => { + this.webSocketStateChanged$.emit(WebSocketStatus.Closed); + this.reconnectWebSocket(); + } + }); + } + + private createMessageWebsocketConnection(websocketPort: number) { + if (this._messageWebSocketSubscription) { + this._messageWebSocketSubscription.unsubscribe(); + } + + this._stateWebSocketSubscription = createWebSocket( + this.host, + websocketPort, + (input: any) => deserialize(input, Message), + 'messages', + ).subscribe({ + next: (message) => this.handleMessage(message), + error: (e) => this.reconnectWebSocket(), + complete: () => this.reconnectWebSocket() + }); + } + + private reconnectWebSocket = () => { this.clearWebSocketTimeoutHandle(); this.webSocketTimeoutHandle = setTimeout(() => { this.createWebSocket(); @@ -281,17 +310,12 @@ export class OpenLPService { } } - private readWebSocketMessage = (event: MessageEvent) => { - const reader = new FileReader(); - reader.onload = () => { - const state = deserialize(JSON.parse(reader.result as string).results, State); - this.handleStateChange(state); - }; - reader.readAsText(event.data); - }; - handleStateChange(state: State) { this.isTwelveHourTime = state.twelve; this.stateChanged$.emit(state); } + + handleMessage(message: Message) { + this.messageReceived$.emit(message); + } } diff --git a/src/app/responses.ts b/src/app/responses.ts index 19c3332..d4095c8 100644 --- a/src/app/responses.ts +++ b/src/app/responses.ts @@ -81,3 +81,15 @@ export interface Credentials { export interface AuthToken { token: string; } + +export class Message { + plugin: T['plugin']; + key: T['key']; + value: T['value']; +} + +export interface MessageType { + plugin: string; + key: string; + value: any; +} diff --git a/src/app/utils.ts b/src/app/utils.ts new file mode 100644 index 0000000..8253673 --- /dev/null +++ b/src/app/utils.ts @@ -0,0 +1,12 @@ +import { Type } from '@angular/core'; + +export function deserialize(json: any, cls: Type): T { + const inst = new cls(); + for (const p in json) { + if (!json.hasOwnProperty(p)) { + continue; + } + inst[p] = json[p]; + } + return inst; +};