
import {Buffer} from 'buffer'
import { EventEmitter } from 'events';
import { Client, connect, IClientOptions } from 'mqtt';
import { emitter, getStoreId, getUserToken, getUserId, UserStoreChanged, UserStoreOperation, UserStoreType } from '../helpers/userStore';
import { createLogger } from '../logger';
import loggerBuffer from '../loggerBuffer';
import { APP, AppConfig } from '../config';

const logger = createLogger({component: 'broker'})

let overrideBrokerUrl: string|null = null

export interface StoreEvent<T=unknown> {
    storeId: string
    topic: string
    payload: T
}

export interface ForwardMessage {
    ts: number
    messageId?: number
    payload: unknown
}

export interface BrokerMessage<T> {
    payload: T
}

class Broker extends EventEmitter {

    client: Client|null = null
    userId: string = ""
    storeId: string = ""

    connected: boolean = false

    constructor() {
        super()
        
        this.processLogger()

        emitter.on(UserStoreType.configLoader, (ev: UserStoreChanged) => {
            if (ev.op !== UserStoreOperation.UPDATED) {
                return
            }
            const appConfig = ev.value as AppConfig
            if (appConfig.brokerUrl) {
                logger.info(`Using MQTT URL ${appConfig.brokerUrl}`)
                overrideBrokerUrl = appConfig.brokerUrl
            }
            this.reconnect()
        })

        emitter.on(UserStoreType.userToken, (ev: UserStoreChanged) => {
            if (ev.op === UserStoreOperation.UPDATED) {
                logger.debug("Broker: Token changed, try connection")
                this.reconnect()
            }
            if (ev.op === UserStoreOperation.REMOVED) {
                logger.debug("Broker: Token removed, disconnect")
                this.close()
            }
        })

        this.reconnect()
    }

    processLogger() {
        // process logger queue
        const b = this
        loggerBuffer.onAddCallback = () => {
            if (!b.connected) return
            // logger.debug(`--- Processing ${loggerBuffer.size()} items in logger queue`)
            while(loggerBuffer.size()) {
                const msg = loggerBuffer.shift()
                if (!msg) continue
                b.publishUser('logger', msg)
            }
        }
    }

    createClient(): Client|null {

        const userToken = getUserToken() || ''
        if (!userToken || userToken.length < 10) {
            logger.debug('token not set')
            return null
        }

        this.storeId = getStoreId() || ""
        this.userId = getUserId() || ""

        if (localStorage.brokerUrl) {
            logger.warn(`Using broker url from localStorage.brokerUrl=${localStorage.brokerUrl}`)
        }

        const brokerUrl = localStorage.brokerUrl || overrideBrokerUrl || process.env.REACT_APP_MQTT_URI || APP.MQTT_URI

        // logger.debug(`Connect to broker ${brokerUrl}`)
        const client = connect(brokerUrl, {
            clientId: "customer_app_" + (Math.random() * Date.now()).toString().replace(".", ""),
            username: userToken,
            password: "mqtt",
            connectTimeout: 15 * 1000
        } as IClientOptions)

        client.on('message', (topic, message) => {
            this.emit("message", { topic, message })
            const parts = topic.split("/")
            if (parts.length > 2 && parts[0] === "user") {
                const eventPath = parts.splice(2).join(".")
                // logger.debug(`emit(${eventPath}, ${message})`)
                const payload = this.parse(message)
                this.emit(eventPath, payload)
            }
            if (parts.length > 2 && parts[0] === "store") {
                const storeId = parts[1]
                const topic = parts.slice(2).join(".")
                // logger.debug(`emit(storeId=${storeId}, topic=${topic}, payload=${message})`)
                let {payload} = this.parse(message)
                const p = payload as any
                this.emit('store', {storeId, topic, payload: p.payload || p})
                this.emit(`store.${topic}`, {storeId, topic, payload: p.payload || p})
            }
        })

        client.on('disconnect', () => {
            this.connected = false
            this.emit("disconnected")
            this.unsubscribeUser('#')
            logger.info("Broker disconnected")
        })

        client.on('reconnect', () => {
            logger.debug("broker reconnected")
            this.emit("reconnect")
        })

        client.on('close', () => {
            // logger.debug("broker.close")
        })

        client.on('connect', () => {
            this.connected = true
            this.emit("connected")
            this.subscribeUser('#')
            emitter.emit(UserStoreType.broker, UserStoreOperation.UPDATED)
            // logger.info("Broker connected")
        })

        client.on('error', (err) => {
            this.emit("error", err)
            logger.debug(`Connection error: ${err.message}`)
        })

        return client
    }

    parse(raw: Buffer): BrokerMessage<unknown> {
      const payload = JSON.parse(raw.toString())
      const forward = payload as ForwardMessage
      if (forward.messageId && forward.payload && forward.ts) {
          return {payload: forward.payload}
      }
      return {payload} as BrokerMessage<unknown>
    }

    close(): void {
        if (this.client) this.client.end()
        this.client = null
    }

    reconnect(): void {
        this.close()
        this.client = this.createClient()
    }

    publish(topic: string, payload: any) {
        this.client?.publish(topic, payload, {}, (err: Error | undefined) => {
            if (err) {
                logger.error(`broker.publish error ${err.message}`)
                this.emit("error", err)
            }
            // logger.debug(`Published on ${topic}`)
        })
    }

    publishUser(topicPart: string, payload: any) {
        const t = this.getUserTopic(topicPart)
        if (!t) return null
        return this.publish(t, payload)
    }

    subscribeUser(topicPart: string): string|null {
        const t = this.getUserTopic(topicPart)
        if (!t) return null
        this.subscribe(t)
        return t
    }

    getUserTopic(topicPart: string): string|null {
        if (!this.userId) return null
        return `user/${this.userId}/${topicPart.replace(/\./g, '/')}`
    }

    subscribe(...topics: string[]) {
        topics.forEach(topic => {
            this.client?.subscribe(topic, (err: Error) => {
                if (err) {
                    logger.warn(`broker subscribe error topic=${topic}: ${err.message}`)
                    this.emit("error", err)
                    return
                }
                // logger.debug(`broker subscribed to topic=${topic}`)
            })
        })
    }

    unsubscribe(...topics: string[]) {
        topics.forEach(topic => {
            this.client?.unsubscribe(topic, (err: Error) => {
                if (err) {
                    logger.warn(`broker unsubscribe error topic=${topic}: ${err.message}`)
                    this.emit("error", err)
                    return
                }
                // logger.debug(`broker unsubscribed from topic=${topic}`)
            })
        })
    }

    unsubscribeUser(topicPart: string): string|null {
        const t = this.getUserTopic(topicPart)
        if (!t) return null
        this.unsubscribe(t)
        return t
    }

}

const broker: Broker = new Broker()

const getBroker = (): Broker => {
    return broker
}

export {
    getBroker
};

