1 /*
2  * Copyright (C) 2015-2019 Alibaba Group Holding Limited
3  */
4 
5 #include "aiot_mqtt_api.h"
6 #include "aiot_state_api.h"
7 #include "aiot_sysdep_api.h"
8 #include "amp_platform.h"
9 #include "amp_task.h"
10 #include "aos/kv.h"
11 #include "aos_system.h"
12 #include "be_inl.h"
13 #include "module_mqtt.h"
14 #include "py_defines.h"
15 
16 #define MOD_STR "MQTT"
17 
18 /* 位于portfiles/aiot_port文件夹下的系统适配函数集合 */
19 extern aiot_sysdep_portfile_t g_aiot_sysdep_portfile;
20 
21 /* 位于external/ali_ca_cert.c中的服务器证书 */
22 extern const char *ali_ca_cert;
23 
24 uint8_t mqtt_process_thread_running = 0;
25 uint8_t mqtt_recv_thread_running = 0;
26 
27 /* 执行aiot_mqtt_process的线程, 包含心跳发送和QoS1消息重发 */
mqtt_process_thread(void * args)28 void mqtt_process_thread(void *args)
29 {
30     int32_t res = STATE_SUCCESS;
31 
32     while (mqtt_process_thread_running) {
33         res = aiot_mqtt_process(args);
34         if (res == STATE_USER_INPUT_EXEC_DISABLED) {
35             break;
36         }
37         aos_msleep(1000);
38     }
39     aos_task_exit(0);
40     return;
41 }
42 
43 /* 执行aiot_mqtt_recv的线程, 包含网络自动重连和从服务器收取MQTT消息 */
mqtt_recv_thread(void * args)44 void mqtt_recv_thread(void *args)
45 {
46     int32_t res = STATE_SUCCESS;
47 
48     while (mqtt_recv_thread_running) {
49         res = aiot_mqtt_recv(args);
50         if (res < STATE_SUCCESS) {
51             if (res == STATE_USER_INPUT_EXEC_DISABLED) {
52                 break;
53             }
54             aos_msleep(1000);
55         }
56     }
57     aos_task_exit(0);
58     return;
59 }
60 
61 /* MQTT默认消息处理回调, 当SDK从服务器收到MQTT消息时,
62  * 且无对应用户回调处理时被调用 */
mqtt_recv_handler(void * handle,const aiot_mqtt_recv_t * packet,void * userdata)63 void mqtt_recv_handler(void *handle, const aiot_mqtt_recv_t *packet,
64                        void *userdata)
65 {
66     switch (packet->type) {
67     case AIOT_MQTTRECV_HEARTBEAT_RESPONSE:
68         {
69             // amp_debug(MOD_STR, "heartbeat response");
70             /* TODO: 处理服务器对心跳的回应, 一般不处理 */
71         }
72         break;
73 
74     case AIOT_MQTTRECV_SUB_ACK:
75         {
76             amp_debug(MOD_STR,
77                       "suback, res: -0x%04X, packet id: %d, max qos: %d",
78                       -packet->data.sub_ack.res, packet->data.sub_ack.packet_id,
79                       packet->data.sub_ack.max_qos);
80             /* TODO: 处理服务器对订阅请求的回应, 一般不处理 */
81         }
82         break;
83 
84     case AIOT_MQTTRECV_PUB:
85         {
86             amp_debug(MOD_STR, "pub, qos: %d, topic: %.*s",
87                       packet->data.pub.qos, packet->data.pub.topic_len,
88                       packet->data.pub.topic);
89             amp_debug(MOD_STR, "pub, payload: %.*s",
90                       packet->data.pub.payload_len, packet->data.pub.payload);
91             /* TODO: 处理服务器下发的业务报文 */
92         }
93         break;
94 
95     case AIOT_MQTTRECV_PUB_ACK:
96         {
97             amp_debug(MOD_STR, "puback, packet id: %d",
98                       packet->data.pub_ack.packet_id);
99             /* TODO: 处理服务器对QoS1上报消息的回应, 一般不处理 */
100         }
101         break;
102 
103     default:
104         {
105         }
106     }
107 }
108 
109 /* MQTT事件回调函数, 当网络连接/重连/断开时被触发,
110  * 事件定义见core/aiot_mqtt_api.h */
mqtt_event_handler(void * handle,const aiot_mqtt_event_t * event,void * userdata)111 void mqtt_event_handler(void *handle, const aiot_mqtt_event_t *event,
112                         void *userdata)
113 {
114     switch (event->type) {
115     /* SDK因为用户调用了aiot_mqtt_connect()接口, 与mqtt服务器建立连接已成功
116      */
117     case AIOT_MQTTEVT_CONNECT:
118         {
119             amp_debug(MOD_STR, "AIOT_MQTTEVT_CONNECT");
120             /* TODO: 处理SDK建连成功, 不可以在这里调用耗时较长的阻塞函数 */
121             int js_cb_ref = (int *)userdata;
122             amp_debug(MOD_STR, "js cb ref is: %d", js_cb_ref);
123             duk_context *ctx = be_get_context();
124             be_push_ref(ctx, js_cb_ref);
125             duk_push_int(ctx, 9);
126             if (duk_pcall(ctx, 1) != DUK_EXEC_SUCCESS) {
127                 amp_console("%s", duk_safe_to_stacktrace(ctx, -1));
128             }
129             duk_pop(ctx);
130 
131             duk_gc(ctx, 0);
132         }
133         break;
134 
135     /* SDK因为网络状况被动断连后, 自动发起重连已成功 */
136     case AIOT_MQTTEVT_RECONNECT:
137         {
138             amp_debug(MOD_STR, "AIOT_MQTTEVT_RECONNECT");
139             /* TODO: 处理SDK重连成功, 不可以在这里调用耗时较长的阻塞函数 */
140         }
141         break;
142 
143     /* SDK因为网络的状况而被动断开了连接, network是底层读写失败,
144      * heartbeat是没有按预期得到服务端心跳应答 */
145     case AIOT_MQTTEVT_DISCONNECT:
146         {
147             char *cause = (event->data.disconnect ==
148                            AIOT_MQTTDISCONNEVT_NETWORK_DISCONNECT)
149                               ? ("network disconnect")
150                               : ("heartbeat disconnect");
151             amp_debug(MOD_STR, "AIOT_MQTTEVT_DISCONNECT: %s", cause);
152             /* TODO: 处理SDK被动断连, 不可以在这里调用耗时较长的阻塞函数 */
153         }
154         break;
155 
156     default:
157         {
158         }
159     }
160 }
161 
mqtt_client_start(void ** handle,amp_mqtt_params_t * mqtt_params)162 int32_t mqtt_client_start(void **handle, amp_mqtt_params_t *mqtt_params)
163 {
164     int32_t res = STATE_SUCCESS;
165     void *mqtt_handle = NULL;
166     char *host = mqtt_params->host;
167     uint16_t port = mqtt_params->port;
168     char *clientid = mqtt_params->clientid;
169     char *username = mqtt_params->username;
170     char *password = mqtt_params->password;
171     uint16_t keepaliveSec = mqtt_params->keepaliveSec;
172     int js_cb_ref = mqtt_params->js_cb_ref;
173     aiot_sysdep_network_cred_t cred;
174 
175     /* 配置SDK的底层依赖 */
176     aiot_sysdep_set_portfile(&g_aiot_sysdep_portfile);
177     /* 配置SDK的日志输出 */
178     // aiot_state_set_logcb(demo_state_logcb);
179 
180     /* 创建SDK的安全凭据, 用于建立TLS连接 */
181     memset(&cred, 0, sizeof(aiot_sysdep_network_cred_t));
182     cred.option =
183         AIOT_SYSDEP_NETWORK_CRED_SVRCERT_CA; /* 使用RSA证书校验MQTT服务端 */
184     cred.max_tls_fragment =
185         16384; /* 最大的分片长度为16K, 其它可选值还有4K, 2K, 1K, 0.5K */
186     cred.sni_enabled = 1; /* TLS建连时, 支持Server Name Indicator */
187     cred.x509_server_cert = ali_ca_cert; /* 用来验证MQTT服务端的RSA根证书 */
188     cred.x509_server_cert_len =
189         strlen(ali_ca_cert); /* 用来验证MQTT服务端的RSA根证书长度 */
190 
191     /* 创建1个MQTT客户端实例并内部初始化默认参数 */
192     mqtt_handle = aiot_mqtt_init();
193 
194     if (mqtt_handle == NULL) {
195         amp_debug(MOD_STR, "aiot_mqtt_init failed");
196         aos_free(mqtt_handle);
197         return NULL;
198     }
199 
200     /* TODO: 如果以下代码不被注释, 则例程会用TCP而不是TLS连接云平台 */
201 
202     {
203         memset(&cred, 0, sizeof(aiot_sysdep_network_cred_t));
204         cred.option = AIOT_SYSDEP_NETWORK_CRED_NONE;
205     }
206 
207     /* 配置MQTT服务器地址 */
208     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_HOST, (void *)host);
209     /* 配置MQTT服务器端口 */
210     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PORT, (void *)&port);
211     /* 配置设备productKey */
212     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_CLIENTID, (void *)clientid);
213     /* 配置设备deviceName */
214     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_USERNAME, (void *)username);
215     /* 配置设备deviceSecret */
216     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PASSWORD, (void *)password);
217     /* 配置网络连接的安全凭据, 上面已经创建好了 */
218     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred);
219     /* 配置MQTT心跳间隔 */
220     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_KEEPALIVE_SEC,
221                      (void *)&keepaliveSec);
222     /* 配置回调参数 */
223     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_USERDATA, js_cb_ref);
224     /* 配置MQTT默认消息接收回调函数 */
225     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_RECV_HANDLER,
226                      (void *)mqtt_recv_handler);
227     /* 配置MQTT事件回调函数 */
228     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_EVENT_HANDLER,
229                      (void *)mqtt_event_handler);
230 
231     /* 与服务器建立MQTT连接 */
232     res = aiot_mqtt_connect(mqtt_handle);
233     if (res < STATE_SUCCESS) {
234         /* 尝试建立连接失败, 销毁MQTT实例, 回收资源 */
235         aiot_mqtt_deinit(&mqtt_handle);
236         amp_debug(MOD_STR, "aiot_mqtt_connect failed: -0x%04X", -res);
237         aos_task_exit(0);
238         return NULL;
239     }
240 
241     /* 创建一个单独的线程, 专用于执行aiot_mqtt_process, 它会自动发送心跳保活,
242      * 以及重发QoS1的未应答报文 */
243     mqtt_process_thread_running = 1;
244 
245     aos_task_t mqtt_process_task;
246 
247     if (aos_task_new_ext(&mqtt_process_task, "mqtt_process",
248                          mqtt_process_thread, mqtt_handle, 1024 * 4,
249                          AOS_DEFAULT_APP_PRI) != 0) {
250         amp_debug(MOD_STR, "management mqtt process task create failed!");
251         aiot_mqtt_deinit(&mqtt_handle);
252         aos_task_exit(0);
253         return NULL;
254     }
255     amp_debug(MOD_STR, "app mqtt process start");
256 
257     /* 创建一个单独的线程用于执行aiot_mqtt_recv,
258      * 它会循环收取服务器下发的MQTT消息, 并在断线时自动重连 */
259     mqtt_recv_thread_running = 1;
260 
261     aos_task_t mqtt_rec_task;
262 
263     if (aos_task_new_ext(&mqtt_rec_task, "mqtt_rec", mqtt_recv_thread,
264                          mqtt_handle, 1024 * 4, AOS_DEFAULT_APP_PRI) != 0) {
265         amp_debug(MOD_STR, "management mqtt rec task create failed!");
266         aiot_mqtt_deinit(&mqtt_handle);
267         aos_task_exit(0);
268         return NULL;
269     }
270     amp_debug(MOD_STR, "app mqtt rec start");
271 
272     *handle = mqtt_handle;
273 
274     return res;
275 }
276 
277 /* mqtt stop */
mqtt_client_stop(void ** handle)278 int32_t mqtt_client_stop(void **handle)
279 {
280     int32_t res = STATE_SUCCESS;
281     void *mqtt_handle = NULL;
282 
283     mqtt_handle = *handle;
284 
285     mqtt_process_thread_running = 0;
286     mqtt_recv_thread_running = 0;
287 
288     /* 断开MQTT连接 */
289     res = aiot_mqtt_disconnect(mqtt_handle);
290     if (res < STATE_SUCCESS) {
291         aiot_mqtt_deinit(&mqtt_handle);
292         amp_debug(MOD_STR, "aiot_mqtt_disconnect failed: -0x%04X", -res);
293         return -1;
294     }
295 
296     /* 销毁MQTT实例 */
297     res = aiot_mqtt_deinit(&mqtt_handle);
298     if (res < STATE_SUCCESS) {
299         amp_debug(MOD_STR, "aiot_mqtt_deinit failed: -0x%04X", -res);
300         return -1;
301     }
302 
303     return res;
304 }
305