1import * as event from 'events' 2import * as MQTT from 'MQTT' 3 4class MQTTClient extends event.EventEmitter { 5 constructor(options) { 6 super(); 7 if (!options || !options.host) { 8 throw new Error('options is invalid'); 9 } 10 11 this.options = { 12 host: options.host, 13 port: options.port || 1883, 14 client_id: options.clientId || this._getRandomClientId(), 15 username: options.username || '', 16 password: options.password || '', 17 keepalive_interval: options.keepalive_interval || 60 18 }; 19 20 this._fail = options.fail || function () { }; 21 this._success = options.success || function () { }; 22 this.connected = false; 23 this.mqttInstance = 0; 24 this._connect(); 25 } 26 27 _getRandomClientId() { 28 return 'amp-' + parseInt(Math.random() * 1000000); 29 } 30 31 _connect() { 32 MQTT.start(this.options, function (res) { 33 if (!res.handle) { 34 return; 35 } 36 this.mqttInstance = res.handle; 37 switch (res.code) { 38 case 0: 39 this.connected = true; 40 this.emit('connect'); break; 41 case 1: 42 this.connected = true; 43 this.emit('reconnect'); break; 44 case 2: 45 this.connected = false; 46 this.emit('disconnect'); break; 47 case 3: 48 this.emit('message', res.topic, res.payload); break; 49 default: break; 50 } 51 }.bind(this)); 52 } 53 54 subscribe(options) { 55 if (!this.mqttInstance || !options || !options.topic) { 56 throw new Error('mqtt not init or options invalid'); 57 } 58 59 if (this.connected === false) { 60 this.emit('error', 'subscirbe fail: not connected'); 61 return; 62 } 63 64 var ret = MQTT.subscribe(this.mqttInstance, options.topic, options.qos || 0); 65 if (ret < 0) { 66 if (typeof options.fail === 'function') { 67 options.fail(); 68 } 69 this.emit('error', 'subscribe error'); 70 } 71 else { 72 if (typeof options.success === 'function') { 73 options.success(); 74 } 75 } 76 } 77 78 unsubscribe(options) { 79 if (!this.mqttInstance || !options || !options.topic) { 80 throw new Error('mqtt not init or mqtt topic is invalid'); 81 } 82 83 if (this.connected === false) { 84 this.emit('error', 'unsubscribe fail: not connected'); 85 return; 86 } 87 88 var ret = MQTT.unsubscribe(this.mqttInstance, options.topic); 89 if (ret < 0) { 90 if (typeof options.fail === 'function') { 91 options.fail(); 92 } 93 this.emit('error', 'unsubscribe error'); 94 return 95 } 96 97 if (typeof options.success === 'function') { 98 options.success(); 99 } 100 } 101 102 publish(options) { 103 if (!this.mqttInstance || !options || !options.topic || !options.message) { 104 throw new Error('mqtt not init or options invalid'); 105 } 106 107 if (this.connected === false) { 108 this.emit('error', 'publish fail: not connected'); 109 return; 110 } 111 112 MQTT.publish(this.mqttInstance, options.topic, options.message, options.qos || 0, function (ret) { 113 if (ret < 0) { 114 if (typeof options.fail === 'function') { 115 options.fail(); 116 } 117 this.emit('error', options.topic); 118 return; 119 } 120 121 if (typeof options.success === 'function') { 122 options.success(); 123 } 124 }.bind(this)); 125 } 126 127 close() { 128 if (!this.mqttInstance) { 129 throw new Error('mqtt not init'); 130 } 131 MQTT.close(this.mqttInstance, function (ret) { 132 if (ret != 0) { 133 this.emit('error', 'mqtt client close error'); 134 return; 135 } 136 this.emit('close'); 137 }.bind(this)); 138 139 } 140} 141 142export function createClient(options) { 143 return new MQTTClient(options); 144} 145