1import * as event from 'events'
2import * as MQTT from 'MQTT'
3
4class MQTTClient extends event.EventEmitter {
5    constructor(options) {
6        super();
7        if (!options || !options.host) {
8            throw new Error('options is invalid');
9        }
10
11        this.options = {
12            host: options.host,
13            port: options.port || 1883,
14            client_id: options.clientId || this._getRandomClientId(),
15            username: options.username || '',
16            password: options.password || '',
17            keepalive_interval: options.keepalive_interval || 60
18        };
19
20        this._fail = options.fail || function () { };
21        this._success = options.success || function () { };
22        this.connected = false;
23        this.mqttInstance = 0;
24        this._connect();
25    }
26
27    _getRandomClientId() {
28        return 'amp-' + parseInt(Math.random() * 1000000);
29    }
30
31    _connect() {
32        MQTT.start(this.options, function (res) {
33            if (!res.handle) {
34                return;
35            }
36            this.mqttInstance = res.handle;
37            switch (res.code) {
38                case 0:
39                    this.connected = true;
40                    this.emit('connect'); break;
41                case 1:
42                    this.connected = true;
43                    this.emit('reconnect'); break;
44                case 2:
45                    this.connected = false;
46                    this.emit('disconnect'); break;
47                case 3:
48                    this.emit('message', res.topic, res.payload); break;
49                default: break;
50            }
51        }.bind(this));
52    }
53
54    subscribe(options) {
55        if (!this.mqttInstance || !options || !options.topic) {
56            throw new Error('mqtt not init or options invalid');
57        }
58
59        if (this.connected === false) {
60            this.emit('error', 'subscirbe fail: not connected');
61            return;
62        }
63
64        var ret = MQTT.subscribe(this.mqttInstance, options.topic, options.qos || 0);
65        if (ret < 0) {
66            if (typeof options.fail === 'function') {
67                options.fail();
68            }
69            this.emit('error', 'subscribe error');
70        }
71        else {
72            if (typeof options.success === 'function') {
73                options.success();
74            }
75        }
76    }
77
78    unsubscribe(options) {
79        if (!this.mqttInstance || !options || !options.topic) {
80            throw new Error('mqtt not init or mqtt topic is invalid');
81        }
82
83        if (this.connected === false) {
84            this.emit('error', 'unsubscribe fail: not connected');
85            return;
86        }
87
88        var ret = MQTT.unsubscribe(this.mqttInstance, options.topic);
89        if (ret < 0) {
90            if (typeof options.fail === 'function') {
91                options.fail();
92            }
93            this.emit('error', 'unsubscribe error');
94            return
95        }
96
97        if (typeof options.success === 'function') {
98            options.success();
99        }
100    }
101
102    publish(options) {
103        if (!this.mqttInstance || !options || !options.topic || !options.message) {
104            throw new Error('mqtt not init or options invalid');
105        }
106
107        if (this.connected === false) {
108            this.emit('error', 'publish fail: not connected');
109            return;
110        }
111
112        MQTT.publish(this.mqttInstance, options.topic, options.message, options.qos || 0, function (ret) {
113            if (ret < 0) {
114                if (typeof options.fail === 'function') {
115                    options.fail();
116                }
117                this.emit('error', options.topic);
118                return;
119            }
120
121            if (typeof options.success === 'function') {
122                options.success();
123            }
124        }.bind(this));
125    }
126
127    close() {
128        if (!this.mqttInstance) {
129            throw new Error('mqtt not init');
130        }
131        MQTT.close(this.mqttInstance, function (ret) {
132            if (ret != 0) {
133                this.emit('error', 'mqtt client close error');
134                return;
135            }
136            this.emit('close');
137        }.bind(this));
138
139    }
140}
141
142export function createClient(options) {
143    return new MQTTClient(options);
144}
145