1 #include "iotx_cm_internal.h"
2
3 #if defined(MQTT_COMM_ENABLED) || defined(MAL_ENABLED)
4 #include "iotx_cm_mqtt.h"
5 #endif
6 #ifdef COAP_COMM_ENABLED
7 #include "iotx_cm_coap.h"
8 #endif
9
10 static void *fd_lock = NULL;
11 static iotx_cm_connection_t *_cm_fd[CM_MAX_FD_NUM] = { NULL };
12 static int _get_fd(iotx_cm_connection_t *handle);
13 static int _recycle_fd(int fd);
14 static inline int _fd_is_valid(int fd);
15 static int inited_conn_num = 0;
16
17 #ifdef DEVICE_MODEL_GATEWAY
18 static void *_iotx_cm_yield_thread_func(void *params);
19 static void *yield_thread = NULL;
20 static int yield_task_leave = 1;
21 #endif
22
23 const char ERR_INVALID_PARAMS[] = "invalid parameter";
24
iotx_cm_open(iotx_cm_init_param_t * params)25 int iotx_cm_open(iotx_cm_init_param_t *params)
26 {
27 int fd;
28 iotx_cm_connection_t *connection = NULL;
29
30 switch (params->protocol_type) {
31 case IOTX_CM_PROTOCOL_TYPE_MQTT:
32 #if defined(MQTT_COMM_ENABLED) || defined(MAL_ENABLED)
33 connection = iotx_cm_open_mqtt(params);
34 #endif
35 break;
36 case IOTX_CM_PROTOCOL_TYPE_COAP:
37 #ifdef COAP_COMM_ENABLED
38 connection = iotx_cm_open_coap(params);
39 #endif
40 break;
41 default:
42 break;
43 }
44
45 if (connection == NULL) {
46 cm_err("cm opon failed");
47 return -1;
48 }
49 fd = _get_fd(connection);
50 if (fd < 0) {
51 cm_err("get fd failed");
52 connection->close_func();
53 return -1;
54 }
55 connection->fd = fd;
56 return fd;
57 }
58
iotx_cm_connect(int fd,uint32_t timeout)59 int iotx_cm_connect(int fd, uint32_t timeout)
60 {
61 iotx_cm_connect_fp connect_func;
62 int ret;
63
64 if (_fd_is_valid(fd) < 0) {
65 cm_err(ERR_INVALID_PARAMS);
66 return -1;
67 }
68 HAL_MutexLock(fd_lock);
69 connect_func = _cm_fd[fd]->connect_func;
70 HAL_MutexUnlock(fd_lock);
71
72 iotx_event_post(IOTX_CONN_CLOUD);
73
74 ret = connect_func(timeout);
75
76 if (ret == 0) {
77 inited_conn_num++;
78 if (inited_conn_num == 1) {
79 #ifdef DEVICE_MODEL_GATEWAY
80 int stack_used;
81 hal_os_thread_param_t task_parms = { 0 };
82 task_parms.stack_size = 6144;
83 task_parms.name = "cm_yield";
84 ret = HAL_ThreadCreate(&yield_thread, _iotx_cm_yield_thread_func,
85 NULL, &task_parms, &stack_used);
86 if (ret < 0) {
87 inited_conn_num--;
88 }
89 #endif
90 }
91 iotx_event_post(IOTX_CONN_CLOUD_SUC);
92 } else {
93 iotx_event_post(IOTX_CONN_CLOUD_FAIL);
94 }
95
96 return ret;
97 }
98
_iotx_cm_yield(int fd,unsigned int timeout)99 static int _iotx_cm_yield(int fd, unsigned int timeout)
100 {
101 iotx_cm_yield_fp yield_func;
102
103 if (fd_lock == NULL) {
104 return NULL_VALUE_ERROR;
105 }
106
107 if (fd == -1) {
108 int i;
109 for (i = 0; i < CM_MAX_FD_NUM; i++) {
110 yield_func = NULL;
111 HAL_MutexLock(fd_lock);
112 if (_cm_fd[i] != NULL) {
113 yield_func = _cm_fd[i]->yield_func;
114 }
115 HAL_MutexUnlock(fd_lock);
116 if (yield_func != NULL) {
117 yield_func(timeout);
118 }
119 }
120 return 0;
121 }
122
123 if (_fd_is_valid(fd) < 0) {
124 cm_err(ERR_INVALID_PARAMS);
125 return -1;
126 }
127
128 HAL_MutexLock(fd_lock);
129 yield_func = _cm_fd[fd]->yield_func;
130 HAL_MutexUnlock(fd_lock);
131 return yield_func(timeout);
132 }
133 #ifdef DEVICE_MODEL_GATEWAY
_iotx_cm_yield_thread_func(void * params)134 static void *_iotx_cm_yield_thread_func(void *params)
135 {
136 yield_task_leave = 0;
137 while (inited_conn_num > 0) {
138 _iotx_cm_yield(-1, CM_DEFAULT_YIELD_TIMEOUT);
139 }
140 yield_task_leave = 1;
141 return NULL;
142 }
143 #endif
144
iotx_cm_yield(int fd,unsigned int timeout)145 int iotx_cm_yield(int fd, unsigned int timeout)
146 {
147 #ifdef DEVICE_MODEL_GATEWAY
148 return 0;
149 #else
150 return _iotx_cm_yield(fd, timeout);
151 #endif
152 }
153
iotx_cm_sub(int fd,iotx_cm_ext_params_t * ext,const char * topic,iotx_cm_data_handle_cb topic_handle_func,void * pcontext)154 int iotx_cm_sub(int fd, iotx_cm_ext_params_t *ext, const char *topic,
155 iotx_cm_data_handle_cb topic_handle_func, void *pcontext)
156 {
157 iotx_cm_sub_fp sub_func;
158
159 if (_fd_is_valid(fd) < 0) {
160 cm_err(ERR_INVALID_PARAMS);
161 return -1;
162 }
163
164 HAL_MutexLock(fd_lock);
165 sub_func = _cm_fd[fd]->sub_func;
166 HAL_MutexUnlock(fd_lock);
167 return sub_func(ext, topic, topic_handle_func, pcontext);
168 }
169
iotx_cm_unsub(int fd,const char * topic)170 int iotx_cm_unsub(int fd, const char *topic)
171 {
172 iotx_cm_unsub_fp unsub_func;
173
174 if (_fd_is_valid(fd) < 0) {
175 cm_err(ERR_INVALID_PARAMS);
176 return -1;
177 }
178
179 HAL_MutexLock(fd_lock);
180 unsub_func = _cm_fd[fd]->unsub_func;
181 HAL_MutexUnlock(fd_lock);
182 return unsub_func(topic);
183 }
184
iotx_cm_pub(int fd,iotx_cm_ext_params_t * ext,const char * topic,const char * payload,unsigned int payload_len)185 int iotx_cm_pub(int fd, iotx_cm_ext_params_t *ext, const char *topic,
186 const char *payload, unsigned int payload_len)
187 {
188 iotx_cm_pub_fp pub_func;
189
190 if (_fd_is_valid(fd) < 0) {
191 cm_err(ERR_INVALID_PARAMS);
192 return -1;
193 }
194
195 HAL_MutexLock(fd_lock);
196 pub_func = _cm_fd[fd]->pub_func;
197 HAL_MutexUnlock(fd_lock);
198 return pub_func(ext, topic, payload, payload_len);
199 }
200
iotx_cm_close(int fd)201 int iotx_cm_close(int fd)
202 {
203 iotx_cm_close_fp close_func;
204
205 if (_fd_is_valid(fd) < 0) {
206 cm_err(ERR_INVALID_PARAMS);
207 return -1;
208 }
209
210 if (inited_conn_num > 0) {
211 inited_conn_num--;
212 }
213
214 if (inited_conn_num == 0) {
215 #ifdef DEVICE_MODEL_GATEWAY
216 while (!yield_task_leave) {
217 HAL_SleepMs(10);
218 }
219 yield_thread = NULL;
220 #endif
221 }
222
223 HAL_MutexLock(fd_lock);
224 close_func = _cm_fd[fd]->close_func;
225 HAL_MutexUnlock(fd_lock);
226 if (close_func() != 0) {
227 return -1;
228 }
229 if (_recycle_fd(fd) != 0) {
230 return -1;
231 }
232
233 if (inited_conn_num == 0) {
234 if (fd_lock != NULL) {
235 HAL_MutexDestroy(fd_lock);
236 fd_lock = NULL;
237 }
238 }
239
240 return 0;
241 }
242
_fd_is_valid(int fd)243 static inline int _fd_is_valid(int fd)
244 {
245 int ret;
246
247 if (fd_lock == NULL) {
248 return NULL_VALUE_ERROR;
249 }
250
251 HAL_MutexLock(fd_lock);
252 ret = (fd >= 0 && fd < CM_MAX_FD_NUM && _cm_fd[fd] != NULL) ? 0 : -1;
253 HAL_MutexUnlock(fd_lock);
254 return ret;
255 }
256
_recycle_fd(int fd)257 static int _recycle_fd(int fd)
258 {
259 if (fd_lock == NULL) {
260 fd_lock = HAL_MutexCreate();
261 if (fd_lock == NULL) {
262 return -1;
263 }
264 }
265
266 if (fd < 0 || fd > CM_MAX_FD_NUM - 1) {
267 return -1;
268 }
269
270 HAL_MutexLock(fd_lock);
271 _cm_fd[fd] = NULL;
272 HAL_MutexUnlock(fd_lock);
273
274 return 0;
275 }
276
_get_fd(iotx_cm_connection_t * handle)277 static int _get_fd(iotx_cm_connection_t *handle)
278 {
279 int i;
280 if (handle == NULL) {
281 return NULL_VALUE_ERROR;
282 }
283
284 if (fd_lock == NULL) {
285 fd_lock = HAL_MutexCreate();
286 if (fd_lock == NULL) {
287 return -1;
288 }
289 }
290
291 HAL_MutexLock(fd_lock);
292 for (i = 0; i < CM_MAX_FD_NUM; i++) {
293 if (_cm_fd[i] == NULL) {
294 _cm_fd[i] = handle;
295 HAL_MutexUnlock(fd_lock);
296 return i;
297 }
298 }
299 HAL_MutexUnlock(fd_lock);
300 cm_err("cm fd reached the limit");
301 return -1;
302 }
303