1 /*
2  * Copyright (C) 2015-2019 Alibaba Group Holding Limited
3  */
4 
5 #include "aiot_dm_api.h"
6 #include "aiot_mqtt_api.h"
7 #include "aiot_state_api.h"
8 #include "aiot_subdev_api.h"
9 #include "aiot_sysdep_api.h"
10 #include "amp_platform.h"
11 #include "amp_task.h"
12 #include "aos/kv.h"
13 #include "aos_system.h"
14 #include "py_defines.h"
15 // #include "be_inl.h"
16 #include "module_aiot.h"
17 #ifdef AOS_COMP_UAGENT
18 #include "uagent.h"
19 #endif
20 #define MOD_STR "AIOT_MQTT"
21 
22 /* 位于portfiles/aiot_port文件夹下的系统适配函数集合 */
23 extern aiot_sysdep_portfile_t g_aiot_sysdep_portfile;
24 
25 /* 位于external/ali_ca_cert.c中的服务器证书 */
26 extern const char *ali_ca_cert;
27 
28 uint8_t pyamp_g_app_mqtt_process_thread_running = 0;
29 uint8_t pyamp_g_app_mqtt_recv_thread_running = 0;
30 
__amp_strdup(char * src,int len)31 static char *__amp_strdup(char *src, int len)
32 {
33     char *dst;
34 
35     if (src == NULL) {
36         return NULL;
37     }
38 
39     dst = aos_malloc(len + 1);
40     if (dst == NULL) {
41         return NULL;
42     }
43 
44     memcpy(dst, src, len);
45     dst[len] = '\0';
46     return dst;
47 }
48 
49 /* 执行aiot_mqtt_process的线程, 包含心跳发送和QoS1消息重发 */
pyamp_aiot_app_mqtt_process_thread(void * args)50 void pyamp_aiot_app_mqtt_process_thread(void *args)
51 {
52     int32_t res = STATE_SUCCESS;
53 
54     while (pyamp_g_app_mqtt_process_thread_running) {
55         res = aiot_mqtt_process(args);
56         if (res == STATE_USER_INPUT_EXEC_DISABLED) {
57             break;
58         }
59         aos_msleep(1000);
60     }
61     aos_task_exit(0);
62     return;
63 }
64 
65 /* 执行aiot_mqtt_recv的线程, 包含网络自动重连和从服务器收取MQTT消息 */
pyamp_aiot_app_mqtt_recv_thread(void * args)66 void pyamp_aiot_app_mqtt_recv_thread(void *args)
67 {
68     int32_t res = STATE_SUCCESS;
69 
70     while (pyamp_g_app_mqtt_recv_thread_running) {
71         res = aiot_mqtt_recv(args);
72         if (res < STATE_SUCCESS) {
73             if (res == STATE_USER_INPUT_EXEC_DISABLED) {
74                 break;
75             }
76             aos_msleep(1000);
77         }
78     }
79     aos_task_exit(0);
80     return;
81 }
82 
83 /* MQTT默认消息处理回调, 当SDK从服务器收到MQTT消息时,
84  * 且无对应用户回调处理时被调用 */
pyamp_aiot_app_mqtt_recv_handler(void * handle,const aiot_mqtt_recv_t * packet,void * userdata)85 void pyamp_aiot_app_mqtt_recv_handler(void *handle,
86                                       const aiot_mqtt_recv_t *packet,
87                                       void *userdata)
88 {
89     void (*callback)(void *userdata) = (void (*)(void *))userdata;
90 
91     switch (packet->type) {
92     case AIOT_MQTTRECV_HEARTBEAT_RESPONSE:
93         {
94             // amp_debug(MOD_STR, "heartbeat response");
95             /* TODO: 处理服务器对心跳的回应, 一般不处理 */
96         }
97         break;
98 
99     case AIOT_MQTTRECV_SUB_ACK:
100         {
101             amp_debug(MOD_STR,
102                       "suback, res: -0x%04X, packet id: %d, max qos: %d",
103                       -packet->data.sub_ack.res, packet->data.sub_ack.packet_id,
104                       packet->data.sub_ack.max_qos);
105             /* TODO: 处理服务器对订阅请求的回应, 一般不处理 */
106         }
107         break;
108 
109     case AIOT_MQTTRECV_PUB:
110         {
111             amp_debug(MOD_STR, "pub, qos: %d, topic: %.*s",
112                       packet->data.pub.qos, packet->data.pub.topic_len,
113                       packet->data.pub.topic);
114             amp_debug(MOD_STR, "pub, payload: %.*s",
115                       packet->data.pub.payload_len, packet->data.pub.payload);
116             /* TODO: 处理服务器下发的业务报文 */
117             iot_mqtt_userdata_t *udata = (iot_mqtt_userdata_t *)userdata;
118             iot_mqtt_message_t message;
119             memset(&message, 0, sizeof(iot_mqtt_message_t));
120             if (udata && udata->callback) {
121                 message.option = AIOT_MQTTOPT_RECV_HANDLER;
122                 message.recv.type = packet->type;
123                 message.recv.code = AIOT_MQTT_MESSAGE;
124                 message.recv.topic = __amp_strdup(packet->data.pub.topic,
125                                                   packet->data.pub.topic_len);
126                 message.recv.payload = __amp_strdup(
127                     packet->data.pub.payload, packet->data.pub.payload_len);
128                 message.recv.topic_len = packet->data.pub.topic_len;
129                 message.recv.payload_len = packet->data.pub.payload_len;
130                 udata->callback(&message, udata);
131                 aos_free(message.recv.topic);
132                 aos_free(message.recv.payload);
133             }
134         }
135         break;
136 
137     case AIOT_MQTTRECV_PUB_ACK:
138         {
139             amp_debug(MOD_STR, "puback, packet id: %d",
140                       packet->data.pub_ack.packet_id);
141             /* TODO: 处理服务器对QoS1上报消息的回应, 一般不处理 */
142         }
143         break;
144 
145     default:
146         {
147         }
148     }
149 }
150 
151 /* MQTT事件回调函数, 当网络连接/重连/断开时被触发,
152  * 事件定义见core/aiot_mqtt_api.h */
pyamp_aiot_app_mqtt_event_handler(void * handle,const aiot_mqtt_event_t * event,void * userdata)153 void pyamp_aiot_app_mqtt_event_handler(void *handle,
154                                        const aiot_mqtt_event_t *event,
155                                        void *userdata)
156 {
157     iot_mqtt_userdata_t *udata = (iot_mqtt_userdata_t *)userdata;
158     iot_mqtt_message_t message;
159 
160     memset(&message, 0, sizeof(iot_mqtt_message_t));
161     message.option = AIOT_MQTTOPT_EVENT_HANDLER;
162     message.event.type = event->type;
163 
164     switch (event->type) {
165     /* SDK因为用户调用了aiot_mqtt_connect()接口, 与mqtt服务器建立连接已成功
166      */
167     case AIOT_MQTTEVT_CONNECT:
168         {
169             amp_debug(MOD_STR, "AIOT_MQTTEVT_CONNECT");
170             /* TODO: 处理SDK建连成功, 不可以在这里调用耗时较长的阻塞函数 */
171             message.event.code = AIOT_MQTT_CONNECT;
172         }
173         break;
174 
175     /* SDK因为网络状况被动断连后, 自动发起重连已成功 */
176     case AIOT_MQTTEVT_RECONNECT:
177         {
178             amp_debug(MOD_STR, "AIOT_MQTTEVT_RECONNECT");
179             /* TODO: 处理SDK重连成功, 不可以在这里调用耗时较长的阻塞函数 */
180             message.event.code = AIOT_MQTT_RECONNECT;
181         }
182         break;
183 
184     /* SDK因为网络的状况而被动断开了连接, network是底层读写失败,
185      * heartbeat是没有按预期得到服务端心跳应答 */
186     case AIOT_MQTTEVT_DISCONNECT:
187         {
188             char *cause = (event->data.disconnect ==
189                            AIOT_MQTTDISCONNEVT_NETWORK_DISCONNECT)
190                               ? ("network disconnect")
191                               : ("heartbeat disconnect");
192             amp_debug(MOD_STR, "AIOT_MQTTEVT_DISCONNECT: %s", cause);
193             /* TODO: 处理SDK被动断连, 不可以在这里调用耗时较长的阻塞函数 */
194             message.event.code = AIOT_MQTT_DISCONNECT;
195         }
196         break;
197 
198     default:
199         {
200             return;
201         }
202     }
203 
204     if (udata && udata->callback)
205         udata->callback(&message, udata);
206 }
207 
208 /* rawdata 函数演示 */
aiot_app_send_rawdata_post(void * dm_handle,char * data,int32_t data_len)209 int32_t aiot_app_send_rawdata_post(void *dm_handle, char *data, int32_t data_len)
210 {
211     aiot_dm_msg_t msg;
212 
213     memset(&msg, 0, sizeof(aiot_dm_msg_t));
214     msg.type = AIOT_DMMSG_RAW_DATA;
215     msg.data.raw_data.data = data;
216     msg.data.raw_data.data_len = data_len;
217     return aiot_dm_send(dm_handle, &msg);
218 }
219 
220 
221 /* 属性上报函数演示 */
pyamp_aiot_app_send_property_post(void * dm_handle,char * params)222 int32_t pyamp_aiot_app_send_property_post(void *dm_handle, char *params)
223 {
224     aiot_dm_msg_t msg;
225 
226     memset(&msg, 0, sizeof(aiot_dm_msg_t));
227     msg.type = AIOT_DMMSG_PROPERTY_POST;
228     msg.data.property_post.params = params;
229 
230     return aiot_dm_send(dm_handle, &msg);
231 }
232 
233 /* 事件上报函数演示 */
pyamp_aiot_app_send_event_post(void * dm_handle,char * event_id,char * params)234 int32_t pyamp_aiot_app_send_event_post(void *dm_handle, char *event_id,
235                                        char *params)
236 {
237     aiot_dm_msg_t msg;
238 
239     memset(&msg, 0, sizeof(aiot_dm_msg_t));
240     msg.type = AIOT_DMMSG_EVENT_POST;
241     msg.data.event_post.event_id = event_id;
242     msg.data.event_post.params = params;
243 
244     return aiot_dm_send(dm_handle, &msg);
245 }
246 
pyamp_aiot_mqtt_client_start(void ** handle,int keepaliveSec,iot_mqtt_userdata_t * userdata)247 int32_t pyamp_aiot_mqtt_client_start(void **handle, int keepaliveSec,
248                                      iot_mqtt_userdata_t *userdata)
249 {
250     int32_t res = STATE_SUCCESS;
251     void *mqtt_handle = NULL;
252     char *url =
253         "iot-as-mqtt.cn-shanghai.aliyuncs.com"; /* 阿里云平台上海站点的域名后缀
254                                                  */
255     char host[100] = { 0 }; /* 用这个数组拼接设备连接的云平台站点全地址, 规则是
256                ${productKey}.iot-as-mqtt.cn-shanghai.aliyuncs.com */
257     uint16_t port =
258         443; /* 无论设备是否使用TLS连接阿里云平台, 目的端口都是443 */
259     aiot_sysdep_network_cred_t
260         cred; /* 安全凭据结构体, 如果要用TLS, 这个结构体中配置CA证书等参数 */
261 
262     /* get device tripple info */
263     char product_key[IOTX_PRODUCT_KEY_LEN] = { 0 };
264     char device_name[IOTX_DEVICE_NAME_LEN] = { 0 };
265     char device_secret[IOTX_DEVICE_SECRET_LEN] = { 0 };
266 
267     int productkey_len = IOTX_PRODUCT_KEY_LEN;
268     int devicename_len = IOTX_DEVICE_NAME_LEN;
269     int devicesecret_len = IOTX_DEVICE_SECRET_LEN;
270 
271     aos_kv_get(AMP_CUSTOMER_PRODUCTKEY, product_key, &productkey_len);
272     aos_kv_get(AMP_CUSTOMER_DEVICENAME, device_name, &devicename_len);
273     aos_kv_get(AMP_CUSTOMER_DEVICESECRET, device_secret, &devicesecret_len);
274     /* end get device tripple info */
275 
276     /* 配置SDK的底层依赖 */
277     aiot_sysdep_set_portfile(&g_aiot_sysdep_portfile);
278     /* 配置SDK的日志输出 */
279     // aiot_state_set_logcb(demo_state_logcb);
280 
281     /* 创建SDK的安全凭据, 用于建立TLS连接 */
282     memset(&cred, 0, sizeof(aiot_sysdep_network_cred_t));
283     cred.option =
284         AIOT_SYSDEP_NETWORK_CRED_SVRCERT_CA; /* 使用RSA证书校验MQTT服务端 */
285     cred.max_tls_fragment =
286         16384; /* 最大的分片长度为16K, 其它可选值还有4K, 2K, 1K, 0.5K */
287     cred.sni_enabled = 1; /* TLS建连时, 支持Server Name Indicator */
288     cred.x509_server_cert = ali_ca_cert; /* 用来验证MQTT服务端的RSA根证书 */
289     cred.x509_server_cert_len =
290         strlen(ali_ca_cert); /* 用来验证MQTT服务端的RSA根证书长度 */
291 
292     /* 创建1个MQTT客户端实例并内部初始化默认参数 */
293     mqtt_handle = aiot_mqtt_init();
294 
295     if (mqtt_handle == NULL) {
296         amp_debug(MOD_STR, "aiot_mqtt_init failed");
297         aos_free(mqtt_handle);
298         return -1;
299     }
300 
301     /* TODO: 如果以下代码不被注释, 则例程会用TCP而不是TLS连接云平台 */
302     {
303         memset(&cred, 0, sizeof(aiot_sysdep_network_cred_t));
304         cred.option = AIOT_SYSDEP_NETWORK_CRED_NONE;
305     }
306 
307     snprintf(host, 100, "%s.%s", product_key, url);
308     /* 配置MQTT服务器地址 */
309     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_HOST, (void *)host);
310     /* 配置MQTT服务器端口 */
311     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PORT, (void *)&port);
312     /* 配置设备productKey */
313     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PRODUCT_KEY,
314                      (void *)product_key);
315     /* 配置设备deviceName */
316     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_NAME,
317                      (void *)device_name);
318     /* 配置设备deviceSecret */
319     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_SECRET,
320                      (void *)device_secret);
321     /* 配置网络连接的安全凭据, 上面已经创建好了 */
322     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred);
323     /* 配置MQTT心跳间隔 */
324     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_KEEPALIVE_SEC,
325                      (void *)&keepaliveSec);
326     /* 配置回调参数 */
327     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_USERDATA, userdata);
328     /* 配置MQTT默认消息接收回调函数 */
329     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_RECV_HANDLER,
330                      (void *)pyamp_aiot_app_mqtt_recv_handler);
331     /* 配置MQTT事件回调函数 */
332     aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_EVENT_HANDLER,
333                      (void *)pyamp_aiot_app_mqtt_event_handler);
334 
335     /* 与服务器建立MQTT连接 */
336     res = aiot_mqtt_connect(mqtt_handle);
337     if (res < STATE_SUCCESS) {
338         /* 尝试建立连接失败, 销毁MQTT实例, 回收资源 */
339         aiot_mqtt_deinit(&mqtt_handle);
340         amp_debug(MOD_STR, "aiot_mqtt_connect failed: -0x%04X", -res);
341         aos_task_exit(0);
342         return -1;
343     }
344 
345     /* 创建一个单独的线程, 专用于执行aiot_mqtt_process, 它会自动发送心跳保活,
346      * 以及重发QoS1的未应答报文 */
347     pyamp_g_app_mqtt_process_thread_running = 1;
348 
349     aos_task_t mqtt_process_task;
350 
351     if (aos_task_new_ext(&mqtt_process_task, "mqtt_process",
352                          pyamp_aiot_app_mqtt_process_thread, mqtt_handle,
353                          1024 * 4, AOS_DEFAULT_APP_PRI) != 0) {
354         amp_debug(MOD_STR, "management mqtt process task create failed!");
355         aiot_mqtt_deinit(&mqtt_handle);
356         aos_task_exit(0);
357         return -1;
358     }
359     amp_debug(MOD_STR, "app mqtt process start");
360 
361     /* 创建一个单独的线程用于执行aiot_mqtt_recv,
362      * 它会循环收取服务器下发的MQTT消息, 并在断线时自动重连 */
363     pyamp_g_app_mqtt_recv_thread_running = 1;
364 
365     aos_task_t mqtt_rec_task;
366 
367     if (aos_task_new_ext(&mqtt_rec_task, "mqtt_rec",
368                          pyamp_aiot_app_mqtt_recv_thread, mqtt_handle, 1024 * 4,
369                          AOS_DEFAULT_APP_PRI) != 0) {
370         amp_debug(MOD_STR, "management mqtt rec task create failed!");
371         aiot_mqtt_deinit(&mqtt_handle);
372         aos_task_exit(0);
373         return -1;
374     }
375     amp_debug(MOD_STR, "app mqtt rec start");
376 
377     *handle = mqtt_handle;
378 #ifdef AOS_COMP_UAGENT
379     res = uagent_mqtt_client_set(mqtt_handle);
380     if (res != 0) {
381         amp_debug(MOD_STR, "uAgent mqtt handle set failed ret = %d\n", res);
382     }
383     res = uagent_ext_comm_start(product_key, device_name);
384     if (res != 0) {
385         amp_debug(MOD_STR, "uAgent ext comm  start failed ret = %d\n", res);
386     }
387 #endif
388 
389     return STATE_SUCCESS;
390 }
391 
392 /* mqtt stop */
pyamp_aiot_mqtt_client_stop(void ** handle)393 int32_t pyamp_aiot_mqtt_client_stop(void **handle)
394 {
395     int32_t res = STATE_SUCCESS;
396     void *mqtt_handle = NULL;
397 
398     mqtt_handle = *handle;
399 
400     pyamp_g_app_mqtt_process_thread_running = 0;
401     pyamp_g_app_mqtt_recv_thread_running = 0;
402 
403     /* 断开MQTT连接 */
404     res = aiot_mqtt_disconnect(mqtt_handle);
405     if (res < STATE_SUCCESS) {
406         aiot_mqtt_deinit(&mqtt_handle);
407         amp_debug(MOD_STR, "aiot_mqtt_disconnect failed: -0x%04X", -res);
408         return -1;
409     }
410 
411     /* 销毁MQTT实例 */
412     res = aiot_mqtt_deinit(&mqtt_handle);
413     if (res < STATE_SUCCESS) {
414         amp_debug(MOD_STR, "aiot_mqtt_deinit failed: -0x%04X", -res);
415         return -1;
416     }
417 
418     return res;
419 }
420