import { Injectable, Output, EventEmitter, Input } from '@angular/core';
import { Subject, Subscription } from 'rxjs';
import makeWebSocketObservable, { GetWebSocketResponses, normalClosureMessage } from 'rxjs-websockets';
import { share, switchMap } from 'rxjs/operators';
import { environment } from '../../environments/environment';

@Injectable({
    providedIn: 'root'
})
export class WebSocketNotificationService {

    input = new Subject<string | ArrayBuffer | Blob>();
    @Input() wsKey: string | undefined;
    @Output() initializedWebsocket = new EventEmitter();
    @Output() closedWebsocket = new EventEmitter();
    socket: any;
    messages: any;
    messagesSubscription: Subscription | undefined;
    reconnectTimeout: any;

    constructor() {
    }

    initWebsocket() {
        if (window.location.hostname == 'localhost') {
            this.socket = makeWebSocketObservable(`ws://${environment.websocket.host}:${environment.websocket.port}/ws/notification/${this.wsKey}/`);
        } else {
            this.socket = makeWebSocketObservable(`wss://${environment.websocket.host}:${environment.websocket.port}/ws/notification/${this.wsKey}/`);
        }
        this.messages = this.socket.pipe(
            // the observable produces a value once the websocket has been opened
            switchMap((getResponses: GetWebSocketResponses) => {
                return getResponses(this.input);
            }),
            share(),
        );

        this.messagesSubscription = this.messages.subscribe(
            (message: string) => {
                // console.log('received message:', message);
                // respond to server
                // this.input.next('i got your message')
            },
            (error: Error) => {
                const { message } = error;
                if (message === normalClosureMessage) {
                    // console.log('server closed the websocket connection normally');
                } else {
                    console.log('socket was disconnected due to error:', message);
                    console.log(`reconnect after ${environment.configuration.websocketReconnectTimeout} secs`);
                    this.closedWebsocket.emit();
                    this.reconnectTimeout = setTimeout(() => {
                        this.initWebsocket();
                    }, environment.configuration.websocketReconnectTimeout*10000);
                }
            },
            () => {
                // The clean termination only happens in response to the last
                // subscription to the observable being unsubscribed, any
                // other closure is considered an error.
                // console.log('the connection was closed in response to the user');
            },
        );
        this.input.next('some data');
        this.initializedWebsocket.emit();
        return;
    }

    closeWebsocket() {
        // this also caused the websocket connection to be closed
        this.messagesSubscription?.unsubscribe();
    }

}
