1'use strict'; 2 3const EventEmitter = require('events'); 4 5class MQTTClient extends EventEmitter{ 6 constructor(options) { 7 super(); 8 if (!options || !options.host) { 9 throw new Error('options is invalid'); 10 } 11 12 this.options = { 13 host: options.host, 14 port: options.port || 1883, 15 client_id: options.clientId || this._getRandomClientId(), 16 username: options.username || '', 17 password: options.password || '', 18 keepalive_interval: options.keepalive_interval || 60 19 }; 20 21 this._fail = options.fail || function(){}; 22 this._success = options.success || function(){}; 23 this.connected = false; 24 this._connect(); 25 } 26 27 _getRandomClientId() { 28 return 'amp-' + parseInt(Math.random() * 1000000); 29 } 30 31 _connect() { 32 var cb = function(err){ 33 if(err === -2) { 34 // connect failed 35 this._fail(); 36 this.emit('error', 'connect failed'); 37 } 38 39 if(err === -1) { 40 // status: disconnect 41 this.connected = false; 42 this.emit('disconnect'); 43 } 44 if(err === 0) { 45 // status: connect 46 this.connected = true; 47 this._success(); 48 this.emit('connect'); 49 } 50 }; 51 this.mqttInstance = __native.MQTT.start(this.options, cb.bind(this)); 52 if (!this.mqttInstance){ 53 // connect failed 54 this._fail(); 55 this.emit('error', 'connect failed'); 56 return 57 } 58 } 59 60 subscribe(options) { 61 if (!this.mqttInstance || !options || !options.topic) { 62 throw new Error('mqtt not init or options invalid'); 63 } 64 65 if(this.connected === false) { 66 this.emit('error', 'subscirbe fail: not connected'); 67 return; 68 } 69 70 var ret = __native.MQTT.subscribe(this.mqttInstance, options.topic, options.qos || 0, function(topic, payload){ 71 this.emit('message', topic, payload); 72 }.bind(this)); 73 if (ret < 0) { 74 if(typeof options.fail === 'function') { 75 options.fail(); 76 } 77 this.emit('error', 'subscribe error'); 78 } 79 else { 80 if(typeof options.success === 'function') { 81 options.success(); 82 } 83 } 84 } 85 86 unsubscribe(options) { 87 if (!this.mqttInstance || !options || !options.topic) { 88 throw new Error('mqtt not init or mqtt topic is invalid'); 89 } 90 91 if(this.connected === false) { 92 this.emit('error', 'unsubscribe fail: not connected'); 93 return; 94 } 95 96 var ret = __native.MQTT.unsubscribe(this.mqttInstance, options.topic, function() { 97 }.bind(this)); 98 if (ret < 0) { 99 if(typeof options.fail === 'function') { 100 options.fail(); 101 } 102 this.emit('error', 'unsubscribe error'); 103 return 104 } 105 106 if(typeof options.success === 'function') { 107 options.success(); 108 } 109 } 110 111 publish(options) { 112 if (!this.mqttInstance || !options || !options.topic || !options.message) { 113 throw new Error('mqtt not init or options invalid'); 114 } 115 116 if(this.connected === false) { 117 this.emit('error', 'publish fail: not connected'); 118 return; 119 } 120 121 __native.MQTT.publish(this.mqttInstance, options.topic, options.message, options.qos || 0, function(ret) { 122 if (ret < 0) { 123 if(typeof options.fail === 'function') { 124 options.fail(); 125 } 126 this.emit('error', options.topic); 127 return; 128 } 129 130 if(typeof options.success === 'function') { 131 options.success(); 132 } 133 }.bind(this)); 134 } 135 136 close() { 137 if (!this.mqttInstance) { 138 throw new Error('mqtt not init'); 139 } 140 __native.MQTT.close(this.mqttInstance, function(ret){ 141 if (ret != 0) { 142 this.emit('error', 'mqtt client close error'); 143 return; 144 } 145 this.emit('close'); 146 }.bind(this)); 147 148 } 149} 150 151function createClient(options) { 152 return new MQTTClient(options); 153} 154 155module.exports = { 156 createClient, 157}