Home Reference Source

src/MessageHandler/Server/index.js

const zmq = require('zeromq');
const config = require('./../../config');
const db = require('./../../DatabaseHandler/index');
const _ = require('lodash');
const logger = require('./../../Logger');

let identity;
/** Array to save all connected clients. */
let connectedClients = [];

/**
 * Messaging Class for the Server component.
 */
class MessageServer {

   /**
   * Constructor for the MessageServer.
   * @property {object} sock Defines the router socket that represents the type of communication that can be handled.
   */
    constructor() {
        /** Describes the kind of socket. In this case a router socket. */
        this.sock = zmq.socket('router');
        this.sock.bindSync(`tcp://*:${config.app.port}`);
        this.sock.setsockopt(zmq.ZMQ_ROUTER_MANDATORY, 1);
        logger.info(`Message Server listening on port ${config.app.port}`);

        this.startListening(this.sock);
    }

    /**
     * Sends a message to a specific peer.
     * @param {string} id Identity of the socket to talk back to a specific peer.
     * @param {string} topic The related topic that the message will be about.
     * @param {string} message Actual message content.
     */
    sendMessage = (id = identity, topic, message) => {
        logger.verbose(`Sending following message: ${message}.`);
        this.sock.send([id, topic, message]);
    }

    /**
     * Listens to incoming events on the socket.
     * @param {sock} sock The global socket that the Server will be listening on.
     */
    startListening = (sock) => {
        // can't convert to es6
        this.sock.on('message', function onMessage() {
            let args = Array.apply(null, arguments);
            identity = args[0];
            let identityReadable = identity.toString('utf8');

            let value = args[2].toString('utf8');
            let topic = args[1].toString('utf8');

            if (!_.some(connectedClients, { readable: identityReadable }))
                sock.send([identity, 'ping', 'server']);

            switch (topic) {
                case 'humidity':
                    value = JSON.parse(value);
                    db.insert('humidity', value.value, value.time, identityReadable);
                    break;
                case 'temperature':
                    value = JSON.parse(value);
                    db.insert('temperature', value.value, value.time, identityReadable);
                    break;
                case 'illuminance':
                    value = JSON.parse(value);
                    db.insert('illuminance', value.value, value.time, identityReadable);
                    break;
                case 'pong':
                    /**
                     * Handles the currently connected clients and whether they are still connected
                     */
                    if (!_.some(connectedClients, { readable: identityReadable })) {
                        setInterval(() => {
                            let index = _.findIndex(connectedClients, { readable: identityReadable });
                            
                            sock.send([identity, 'ping', 'server'], null, function (err) {
                                if (err != undefined)
                                    connectedClients[index].online = false;
                                else
                                    connectedClients[index].online = true;
                            });
                        }, 5000);
                    }
                    connectedClients.push({
                        key: connectedClients.length,
                        buffer: identity,
                        readable: identityReadable
                    });
                    connectedClients = _.uniqBy(connectedClients, 'readable');
                    break;
                default:
                    logger.error('error');
            }

            logger.verbose(`received a message related to: ${topic} - containing message: ${value}`);
        });
    }

    /**
     * @return {Array} Method that returns all currently connected clients as array.
     */
    returnConnectedClients = () => {
        return connectedClients;
    }
}

module.exports = MessageServer;