1 /*
2 * Copyright (C) 2015-2019 Alibaba Group Holding Limited
3 */
4
5 #include "aiot_mqtt_api.h"
6 #include "aiot_state_api.h"
7 #include "aiot_sysdep_api.h"
8 #include "amp_platform.h"
9 #include "amp_task.h"
10 #include "aos/kv.h"
11 #include "aos_system.h"
12 #include "be_inl.h"
13 #include "module_mqtt.h"
14 #include "py_defines.h"
15
16 #define MOD_STR "MQTT"
17
18 /* 位于portfiles/aiot_port文件夹下的系统适配函数集合 */
19 extern aiot_sysdep_portfile_t g_aiot_sysdep_portfile;
20
21 /* 位于external/ali_ca_cert.c中的服务器证书 */
22 extern const char *ali_ca_cert;
23
24 uint8_t mqtt_process_thread_running = 0;
25 uint8_t mqtt_recv_thread_running = 0;
26
27 /* 执行aiot_mqtt_process的线程, 包含心跳发送和QoS1消息重发 */
mqtt_process_thread(void * args)28 void mqtt_process_thread(void *args)
29 {
30 int32_t res = STATE_SUCCESS;
31
32 while (mqtt_process_thread_running) {
33 res = aiot_mqtt_process(args);
34 if (res == STATE_USER_INPUT_EXEC_DISABLED) {
35 break;
36 }
37 aos_msleep(1000);
38 }
39 aos_task_exit(0);
40 return;
41 }
42
43 /* 执行aiot_mqtt_recv的线程, 包含网络自动重连和从服务器收取MQTT消息 */
mqtt_recv_thread(void * args)44 void mqtt_recv_thread(void *args)
45 {
46 int32_t res = STATE_SUCCESS;
47
48 while (mqtt_recv_thread_running) {
49 res = aiot_mqtt_recv(args);
50 if (res < STATE_SUCCESS) {
51 if (res == STATE_USER_INPUT_EXEC_DISABLED) {
52 break;
53 }
54 aos_msleep(1000);
55 }
56 }
57 aos_task_exit(0);
58 return;
59 }
60
61 /* MQTT默认消息处理回调, 当SDK从服务器收到MQTT消息时,
62 * 且无对应用户回调处理时被调用 */
mqtt_recv_handler(void * handle,const aiot_mqtt_recv_t * packet,void * userdata)63 void mqtt_recv_handler(void *handle, const aiot_mqtt_recv_t *packet,
64 void *userdata)
65 {
66 switch (packet->type) {
67 case AIOT_MQTTRECV_HEARTBEAT_RESPONSE:
68 {
69 // amp_debug(MOD_STR, "heartbeat response");
70 /* TODO: 处理服务器对心跳的回应, 一般不处理 */
71 }
72 break;
73
74 case AIOT_MQTTRECV_SUB_ACK:
75 {
76 amp_debug(MOD_STR,
77 "suback, res: -0x%04X, packet id: %d, max qos: %d",
78 -packet->data.sub_ack.res, packet->data.sub_ack.packet_id,
79 packet->data.sub_ack.max_qos);
80 /* TODO: 处理服务器对订阅请求的回应, 一般不处理 */
81 }
82 break;
83
84 case AIOT_MQTTRECV_PUB:
85 {
86 amp_debug(MOD_STR, "pub, qos: %d, topic: %.*s",
87 packet->data.pub.qos, packet->data.pub.topic_len,
88 packet->data.pub.topic);
89 amp_debug(MOD_STR, "pub, payload: %.*s",
90 packet->data.pub.payload_len, packet->data.pub.payload);
91 /* TODO: 处理服务器下发的业务报文 */
92 }
93 break;
94
95 case AIOT_MQTTRECV_PUB_ACK:
96 {
97 amp_debug(MOD_STR, "puback, packet id: %d",
98 packet->data.pub_ack.packet_id);
99 /* TODO: 处理服务器对QoS1上报消息的回应, 一般不处理 */
100 }
101 break;
102
103 default:
104 {
105 }
106 }
107 }
108
109 /* MQTT事件回调函数, 当网络连接/重连/断开时被触发,
110 * 事件定义见core/aiot_mqtt_api.h */
mqtt_event_handler(void * handle,const aiot_mqtt_event_t * event,void * userdata)111 void mqtt_event_handler(void *handle, const aiot_mqtt_event_t *event,
112 void *userdata)
113 {
114 switch (event->type) {
115 /* SDK因为用户调用了aiot_mqtt_connect()接口, 与mqtt服务器建立连接已成功
116 */
117 case AIOT_MQTTEVT_CONNECT:
118 {
119 amp_debug(MOD_STR, "AIOT_MQTTEVT_CONNECT");
120 /* TODO: 处理SDK建连成功, 不可以在这里调用耗时较长的阻塞函数 */
121 int js_cb_ref = (int *)userdata;
122 amp_debug(MOD_STR, "js cb ref is: %d", js_cb_ref);
123 duk_context *ctx = be_get_context();
124 be_push_ref(ctx, js_cb_ref);
125 duk_push_int(ctx, 9);
126 if (duk_pcall(ctx, 1) != DUK_EXEC_SUCCESS) {
127 amp_console("%s", duk_safe_to_stacktrace(ctx, -1));
128 }
129 duk_pop(ctx);
130
131 duk_gc(ctx, 0);
132 }
133 break;
134
135 /* SDK因为网络状况被动断连后, 自动发起重连已成功 */
136 case AIOT_MQTTEVT_RECONNECT:
137 {
138 amp_debug(MOD_STR, "AIOT_MQTTEVT_RECONNECT");
139 /* TODO: 处理SDK重连成功, 不可以在这里调用耗时较长的阻塞函数 */
140 }
141 break;
142
143 /* SDK因为网络的状况而被动断开了连接, network是底层读写失败,
144 * heartbeat是没有按预期得到服务端心跳应答 */
145 case AIOT_MQTTEVT_DISCONNECT:
146 {
147 char *cause = (event->data.disconnect ==
148 AIOT_MQTTDISCONNEVT_NETWORK_DISCONNECT)
149 ? ("network disconnect")
150 : ("heartbeat disconnect");
151 amp_debug(MOD_STR, "AIOT_MQTTEVT_DISCONNECT: %s", cause);
152 /* TODO: 处理SDK被动断连, 不可以在这里调用耗时较长的阻塞函数 */
153 }
154 break;
155
156 default:
157 {
158 }
159 }
160 }
161
mqtt_client_start(void ** handle,amp_mqtt_params_t * mqtt_params)162 int32_t mqtt_client_start(void **handle, amp_mqtt_params_t *mqtt_params)
163 {
164 int32_t res = STATE_SUCCESS;
165 void *mqtt_handle = NULL;
166 char *host = mqtt_params->host;
167 uint16_t port = mqtt_params->port;
168 char *clientid = mqtt_params->clientid;
169 char *username = mqtt_params->username;
170 char *password = mqtt_params->password;
171 uint16_t keepaliveSec = mqtt_params->keepaliveSec;
172 int js_cb_ref = mqtt_params->js_cb_ref;
173 aiot_sysdep_network_cred_t cred;
174
175 /* 配置SDK的底层依赖 */
176 aiot_sysdep_set_portfile(&g_aiot_sysdep_portfile);
177 /* 配置SDK的日志输出 */
178 // aiot_state_set_logcb(demo_state_logcb);
179
180 /* 创建SDK的安全凭据, 用于建立TLS连接 */
181 memset(&cred, 0, sizeof(aiot_sysdep_network_cred_t));
182 cred.option =
183 AIOT_SYSDEP_NETWORK_CRED_SVRCERT_CA; /* 使用RSA证书校验MQTT服务端 */
184 cred.max_tls_fragment =
185 16384; /* 最大的分片长度为16K, 其它可选值还有4K, 2K, 1K, 0.5K */
186 cred.sni_enabled = 1; /* TLS建连时, 支持Server Name Indicator */
187 cred.x509_server_cert = ali_ca_cert; /* 用来验证MQTT服务端的RSA根证书 */
188 cred.x509_server_cert_len =
189 strlen(ali_ca_cert); /* 用来验证MQTT服务端的RSA根证书长度 */
190
191 /* 创建1个MQTT客户端实例并内部初始化默认参数 */
192 mqtt_handle = aiot_mqtt_init();
193
194 if (mqtt_handle == NULL) {
195 amp_debug(MOD_STR, "aiot_mqtt_init failed");
196 aos_free(mqtt_handle);
197 return NULL;
198 }
199
200 /* TODO: 如果以下代码不被注释, 则例程会用TCP而不是TLS连接云平台 */
201
202 {
203 memset(&cred, 0, sizeof(aiot_sysdep_network_cred_t));
204 cred.option = AIOT_SYSDEP_NETWORK_CRED_NONE;
205 }
206
207 /* 配置MQTT服务器地址 */
208 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_HOST, (void *)host);
209 /* 配置MQTT服务器端口 */
210 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PORT, (void *)&port);
211 /* 配置设备productKey */
212 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_CLIENTID, (void *)clientid);
213 /* 配置设备deviceName */
214 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_USERNAME, (void *)username);
215 /* 配置设备deviceSecret */
216 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PASSWORD, (void *)password);
217 /* 配置网络连接的安全凭据, 上面已经创建好了 */
218 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred);
219 /* 配置MQTT心跳间隔 */
220 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_KEEPALIVE_SEC,
221 (void *)&keepaliveSec);
222 /* 配置回调参数 */
223 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_USERDATA, js_cb_ref);
224 /* 配置MQTT默认消息接收回调函数 */
225 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_RECV_HANDLER,
226 (void *)mqtt_recv_handler);
227 /* 配置MQTT事件回调函数 */
228 aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_EVENT_HANDLER,
229 (void *)mqtt_event_handler);
230
231 /* 与服务器建立MQTT连接 */
232 res = aiot_mqtt_connect(mqtt_handle);
233 if (res < STATE_SUCCESS) {
234 /* 尝试建立连接失败, 销毁MQTT实例, 回收资源 */
235 aiot_mqtt_deinit(&mqtt_handle);
236 amp_debug(MOD_STR, "aiot_mqtt_connect failed: -0x%04X", -res);
237 aos_task_exit(0);
238 return NULL;
239 }
240
241 /* 创建一个单独的线程, 专用于执行aiot_mqtt_process, 它会自动发送心跳保活,
242 * 以及重发QoS1的未应答报文 */
243 mqtt_process_thread_running = 1;
244
245 aos_task_t mqtt_process_task;
246
247 if (aos_task_new_ext(&mqtt_process_task, "mqtt_process",
248 mqtt_process_thread, mqtt_handle, 1024 * 4,
249 AOS_DEFAULT_APP_PRI) != 0) {
250 amp_debug(MOD_STR, "management mqtt process task create failed!");
251 aiot_mqtt_deinit(&mqtt_handle);
252 aos_task_exit(0);
253 return NULL;
254 }
255 amp_debug(MOD_STR, "app mqtt process start");
256
257 /* 创建一个单独的线程用于执行aiot_mqtt_recv,
258 * 它会循环收取服务器下发的MQTT消息, 并在断线时自动重连 */
259 mqtt_recv_thread_running = 1;
260
261 aos_task_t mqtt_rec_task;
262
263 if (aos_task_new_ext(&mqtt_rec_task, "mqtt_rec", mqtt_recv_thread,
264 mqtt_handle, 1024 * 4, AOS_DEFAULT_APP_PRI) != 0) {
265 amp_debug(MOD_STR, "management mqtt rec task create failed!");
266 aiot_mqtt_deinit(&mqtt_handle);
267 aos_task_exit(0);
268 return NULL;
269 }
270 amp_debug(MOD_STR, "app mqtt rec start");
271
272 *handle = mqtt_handle;
273
274 return res;
275 }
276
277 /* mqtt stop */
mqtt_client_stop(void ** handle)278 int32_t mqtt_client_stop(void **handle)
279 {
280 int32_t res = STATE_SUCCESS;
281 void *mqtt_handle = NULL;
282
283 mqtt_handle = *handle;
284
285 mqtt_process_thread_running = 0;
286 mqtt_recv_thread_running = 0;
287
288 /* 断开MQTT连接 */
289 res = aiot_mqtt_disconnect(mqtt_handle);
290 if (res < STATE_SUCCESS) {
291 aiot_mqtt_deinit(&mqtt_handle);
292 amp_debug(MOD_STR, "aiot_mqtt_disconnect failed: -0x%04X", -res);
293 return -1;
294 }
295
296 /* 销毁MQTT实例 */
297 res = aiot_mqtt_deinit(&mqtt_handle);
298 if (res < STATE_SUCCESS) {
299 amp_debug(MOD_STR, "aiot_mqtt_deinit failed: -0x%04X", -res);
300 return -1;
301 }
302
303 return res;
304 }
305