Show:
'use strict';

var mqtt = require('mqtt'),
  webduino = require('../webduino');

var push = Array.prototype.push;

var Transport = webduino.Transport,
  TransportEvent = webduino.TransportEvent,
  util = webduino.util,
  proto;

var STATUS = {
  OK: 'OK'
};

var TOPIC = {
  PING: '/PING',
  PONG: '/PONG',
  STATUS: '/STATUS'
};

var MQTT_EVENTS = {
  CONNECT: 'connect',
  MESSAGE: 'message',
  CLOSE: 'close',
  ERROR: 'error'
};

/**
 * Conveying messages over MQTT protocol, in Node.JS.
 *
 * @namespace webduino.transport
 * @class NodeMqttTransport
 * @constructor
 * @param {Object} options Options to build a proper transport
 * @extends webduino.Transport
 */
function NodeMqttTransport(options) {
  Transport.call(this, options);

  this._options = options;
  this._client = null;
  this._sendTimer = null;
  this._buf = [];

  this._status = '';

  this._connHandler = onConnect.bind(this);
  this._messageHandler = onMessage.bind(this);
  this._sendOutHandler = sendOut.bind(this);
  this._disconnHandler = onDisconnect.bind(this);
  this._errorHandler = onError.bind(this);

  init(this);
}

function init(self) {
  self._client = mqtt.connect(self._options.server, {
    clientId: '_' + self._options.device + (self._options.multi ? '.' + util.randomId() : ''),
    username: self._options.login || '',
    password: Buffer.from(self._options.password || ''),
    keepalive: NodeMqttTransport.KEEPALIVE_INTERVAL,
    reconnectPeriod: self._options.autoReconnect ? NodeMqttTransport.RECONNECT_PERIOD * 1000 : 0,
    connectTimeout: NodeMqttTransport.CONNECT_TIMEOUT * 1000
  });
  self._client.on(MQTT_EVENTS.CONNECT, self._connHandler);
  self._client.on(MQTT_EVENTS.MESSAGE, self._messageHandler);
  self._client.on(MQTT_EVENTS.CLOSE, self._disconnHandler);
  self._client.on(MQTT_EVENTS.ERROR, self._errorHandler);
}

function onConnect() {
  this._client.subscribe(this._options.device + TOPIC.PONG);
  this._client.subscribe(this._options.device + TOPIC.STATUS);
}

function onMessage(topic, message) {
  try {
    var dest = topic,
      oldStatus = this._status;

    switch (dest.substr(dest.lastIndexOf('/') + 1)) {

    case 'STATUS':
      this._status = message.toString();
      detectStatusChange(this, this._status, oldStatus);
      break;

    default:
      (this._status === STATUS.OK) && this.emit(TransportEvent.MESSAGE, message);
      break;

    }
  } catch (err) {
    this.emit(TransportEvent.ERROR, err);
  }
}

function detectStatusChange(self, newStatus, oldStatus) {
  if (newStatus === oldStatus) {
    if (newStatus === STATUS.OK) {
      // Device reconnected
      self.emit(TransportEvent.REOPEN);
    }
    return;
  }

  if (newStatus === STATUS.OK) {
    self.emit(TransportEvent.OPEN);
  } else {
    self.emit(TransportEvent.ERROR, new Error('board connection failed.'));
  }
}

function onDisconnect() {
  if (this._client.disconnecting || !this._options.autoReconnect) {
    this._client.removeAllListeners();
    delete this._client;
    this.emit(TransportEvent.CLOSE);
  }
}

function onError(error) {
  this.emit(TransportEvent.ERROR, error);
}

function sendOut() {
  var payload = Buffer.from(this._buf);
  this.isOpen && this._client.publish(this._options.device + TOPIC.PING, payload, {
    qos: 0
  });
  clearBuf(this);
}

function clearBuf(self) {
  self._buf = [];
  clearImmediate(self._sendTimer);
  self._sendTimer = null;
}

NodeMqttTransport.prototype = proto = Object.create(Transport.prototype, {

  constructor: {
    value: NodeMqttTransport
  },

  isOpen: {
    get: function () {
      return this._client && this._client.connected;
    }
  }

});

proto.send = function (payload) {
  if (this._buf.length + payload.length + this._options.device.length + TOPIC.PING.length + 4 >
    NodeMqttTransport.MAX_PACKET_SIZE) {
    this._sendOutHandler();
  }
  push.apply(this._buf, payload);
  if (!this._sendTimer) {
    this._sendTimer = setImmediate(this._sendOutHandler);
  }
};

proto.close = function () {
  if (this._client) {
    if (this._client.connected) {
      this._client.end();
    } else {
      this._client.removeAllListeners();
      delete this._client;
    }
  }
};

proto.flush = function () {
  if (this._buf && this._buf.length) {
    this._sendOutHandler();
  }
};

/**
 * Reconnect period when MQTT connection goes down. Measured in seconds.
 *
 * @property RECONNECT_PERIOD
 * @type {Number}
 * @static
 */
NodeMqttTransport.RECONNECT_PERIOD = 1;

/**
 * MQTT keepalive interval. Measured in seconds.
 *
 * @property KEEPALIVE_INTERVAL
 * @type {Number}
 * @static
 */
NodeMqttTransport.KEEPALIVE_INTERVAL = 15;

/**
 * Time to wait before throwing connection timeout exception. Measured in seconds.
 *
 * @property CONNECT_TIMEOUT
 * @type {Number}
 * @static
 */
NodeMqttTransport.CONNECT_TIMEOUT = 30;

/**
 * Maximum packet size in KB.
 *
 * @property MAX_PACKET_SIZE
 * @type {Number}
 * @static
 */
NodeMqttTransport.MAX_PACKET_SIZE = 128;

module.exports = NodeMqttTransport;