1 /*
2 * Copyright (c) 2022 G-Technologies Sdn. Bhd.
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 #include <zephyr/kernel.h>
8 #include <zephyr/shell/shell_mqtt.h>
9 #include <zephyr/init.h>
10 #include <zephyr/logging/log.h>
11 #include <string.h>
12 #include <stdio.h>
13 #include <zephyr/drivers/hwinfo.h>
14
15 SHELL_MQTT_DEFINE(shell_transport_mqtt);
16 SHELL_DEFINE(shell_mqtt, "", &shell_transport_mqtt,
17 CONFIG_SHELL_BACKEND_MQTT_LOG_MESSAGE_QUEUE_SIZE,
18 CONFIG_SHELL_BACKEND_MQTT_LOG_MESSAGE_QUEUE_TIMEOUT, SHELL_FLAG_OLF_CRLF);
19
20 LOG_MODULE_REGISTER(shell_mqtt, CONFIG_SHELL_MQTT_LOG_LEVEL);
21
22 #define NET_EVENT_MASK (NET_EVENT_L4_CONNECTED | NET_EVENT_L4_DISCONNECTED)
23 #define CONNECT_TIMEOUT_MS CONFIG_SHELL_MQTT_CONNECT_TIMEOUT_MS
24 #define LISTEN_TIMEOUT_MS CONFIG_SHELL_MQTT_LISTEN_TIMEOUT_MS
25 #define MQTT_SEND_DELAY_MS K_MSEC(100)
26 #define PROCESS_INTERVAL K_MSEC(CONFIG_SHELL_MQTT_WORK_DELAY_MS)
27 #define SHELL_MQTT_WORKQ_STACK_SIZE 2048
28
29 #ifdef CONFIG_SHELL_MQTT_SERVER_USERNAME
30 #define MQTT_USERNAME CONFIG_SHELL_MQTT_SERVER_USERNAME
31 #else
32 #define MQTT_USERNAME NULL
33 #endif /* CONFIG_SHELL_MQTT_SERVER_USERNAME */
34
35 #ifdef CONFIG_SHELL_MQTT_SERVER_PASSWORD
36 #define MQTT_PASSWORD CONFIG_SHELL_MQTT_SERVER_PASSWORD
37 #else
38 #define MQTT_PASSWORD NULL
39 #endif /*SHELL_MQTT_SERVER_PASSWORD */
40
41 struct shell_mqtt *sh_mqtt;
42 K_KERNEL_STACK_DEFINE(sh_mqtt_workq_stack, SHELL_MQTT_WORKQ_STACK_SIZE);
43
44 static void mqtt_evt_handler(struct mqtt_client *const client, const struct mqtt_evt *evt);
45
sh_mqtt_work_reschedule(struct k_work_delayable * dwork,k_timeout_t delay)46 static inline int sh_mqtt_work_reschedule(struct k_work_delayable *dwork, k_timeout_t delay)
47 {
48 return k_work_reschedule_for_queue(&sh_mqtt->workq, dwork, delay);
49 }
50
sh_mqtt_work_submit(struct k_work * work)51 static inline int sh_mqtt_work_submit(struct k_work *work)
52 {
53 return k_work_submit_to_queue(&sh_mqtt->workq, work);
54 }
55
56 /* Lock the context of the shell mqtt */
sh_mqtt_context_lock(k_timeout_t timeout)57 static inline int sh_mqtt_context_lock(k_timeout_t timeout)
58 {
59 return k_mutex_lock(&sh_mqtt->lock, timeout);
60 }
61
62 /* Unlock the context of the shell mqtt */
sh_mqtt_context_unlock(void)63 static inline void sh_mqtt_context_unlock(void)
64 {
65 (void)k_mutex_unlock(&sh_mqtt->lock);
66 }
67
shell_mqtt_get_devid(char * id,int id_max_len)68 bool __weak shell_mqtt_get_devid(char *id, int id_max_len)
69 {
70 uint8_t hwinfo_id[DEVICE_ID_BIN_MAX_SIZE];
71 ssize_t length;
72
73 length = hwinfo_get_device_id(hwinfo_id, DEVICE_ID_BIN_MAX_SIZE);
74 if (length <= 0) {
75 return false;
76 }
77
78 (void)memset(id, 0, id_max_len);
79 length = bin2hex(hwinfo_id, (size_t)length, id, id_max_len);
80
81 return length > 0;
82 }
83
prepare_fds(struct shell_mqtt * sh)84 static void prepare_fds(struct shell_mqtt *sh)
85 {
86 if (sh->mqtt_cli.transport.type == MQTT_TRANSPORT_NON_SECURE) {
87 sh->fds[0].fd = sh->mqtt_cli.transport.tcp.sock;
88 }
89
90 sh->fds[0].events = ZSOCK_POLLIN;
91 sh->nfds = 1;
92 }
93
clear_fds(struct shell_mqtt * sh)94 static void clear_fds(struct shell_mqtt *sh)
95 {
96 sh->nfds = 0;
97 }
98
99 /*
100 * Upon successful completion, poll() shall return a non-negative value. A positive value indicates
101 * the total number of pollfd structures that have selected events (that is, those for which the
102 * revents member is non-zero). A value of 0 indicates that the call timed out and no file
103 * descriptors have been selected. Upon failure, poll() shall return -1 and set errno to indicate
104 * the error.
105 */
wait(struct shell_mqtt * sh,int timeout)106 static int wait(struct shell_mqtt *sh, int timeout)
107 {
108 int rc = 0;
109
110 if (sh->nfds > 0) {
111 rc = zsock_poll(sh->fds, sh->nfds, timeout);
112 if (rc < 0) {
113 LOG_ERR("poll error: %d", errno);
114 }
115 }
116
117 return rc;
118 }
119
120 /* Query IP address for the broker URL */
get_mqtt_broker_addrinfo(struct shell_mqtt * sh)121 static int get_mqtt_broker_addrinfo(struct shell_mqtt *sh)
122 {
123 int rc;
124 struct zsock_addrinfo hints = { .ai_family = AF_INET,
125 .ai_socktype = SOCK_STREAM,
126 .ai_protocol = 0 };
127
128 if (sh->haddr != NULL) {
129 zsock_freeaddrinfo(sh->haddr);
130 }
131
132 rc = zsock_getaddrinfo(CONFIG_SHELL_MQTT_SERVER_ADDR,
133 STRINGIFY(CONFIG_SHELL_MQTT_SERVER_PORT), &hints, &sh->haddr);
134 if (rc == 0) {
135 LOG_INF("DNS%s resolved for %s:%d", "", CONFIG_SHELL_MQTT_SERVER_ADDR,
136 CONFIG_SHELL_MQTT_SERVER_PORT);
137
138 return 0;
139 }
140
141 LOG_ERR("DNS%s resolved for %s:%d, retrying", " not", CONFIG_SHELL_MQTT_SERVER_ADDR,
142 CONFIG_SHELL_MQTT_SERVER_PORT);
143
144 return rc;
145 }
146
147 /* Close MQTT connection properly and cleanup socket */
sh_mqtt_close_and_cleanup(struct shell_mqtt * sh)148 static void sh_mqtt_close_and_cleanup(struct shell_mqtt *sh)
149 {
150 /* Initialize to negative value so that the mqtt_abort case can run */
151 int rc = -1;
152
153 /* If both network & mqtt connected, mqtt_disconnect will send a
154 * disconnection packet to the broker, it will invoke
155 * mqtt_evt_handler:MQTT_EVT_DISCONNECT if success
156 */
157 if ((sh->network_state == SHELL_MQTT_NETWORK_CONNECTED) &&
158 (sh->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED)) {
159 rc = mqtt_disconnect(&sh->mqtt_cli, NULL);
160 }
161
162 /* If network/mqtt disconnected, or mqtt_disconnect failed, do mqtt_abort */
163 if (rc < 0) {
164 /* mqtt_abort doesn't send disconnection packet to the broker, but it
165 * makes sure that the MQTT connection is aborted locally and will
166 * always invoke mqtt_evt_handler:MQTT_EVT_DISCONNECT
167 */
168 (void)mqtt_abort(&sh->mqtt_cli);
169 }
170
171 /* Cleanup socket */
172 clear_fds(sh);
173 }
174
broker_init(struct shell_mqtt * sh)175 static void broker_init(struct shell_mqtt *sh)
176 {
177 struct sockaddr_in *broker4 = (struct sockaddr_in *)&sh->broker;
178
179 broker4->sin_family = AF_INET;
180 broker4->sin_port = htons(CONFIG_SHELL_MQTT_SERVER_PORT);
181
182 net_ipaddr_copy(&broker4->sin_addr, &net_sin(sh->haddr->ai_addr)->sin_addr);
183 }
184
client_init(struct shell_mqtt * sh)185 static void client_init(struct shell_mqtt *sh)
186 {
187 static struct mqtt_utf8 password;
188 static struct mqtt_utf8 username;
189
190 password.utf8 = (uint8_t *)MQTT_PASSWORD;
191 password.size = strlen(MQTT_PASSWORD);
192 username.utf8 = (uint8_t *)MQTT_USERNAME;
193 username.size = strlen(MQTT_USERNAME);
194
195 mqtt_client_init(&sh->mqtt_cli);
196
197 /* MQTT client configuration */
198 sh->mqtt_cli.broker = &sh->broker;
199 sh->mqtt_cli.evt_cb = mqtt_evt_handler;
200 sh->mqtt_cli.client_id.utf8 = (uint8_t *)sh->device_id;
201 sh->mqtt_cli.client_id.size = strlen(sh->device_id);
202 sh->mqtt_cli.password = &password;
203 sh->mqtt_cli.user_name = &username;
204 sh->mqtt_cli.protocol_version = MQTT_VERSION_3_1_1;
205
206 /* MQTT buffers configuration */
207 sh->mqtt_cli.rx_buf = sh->buf.rx;
208 sh->mqtt_cli.rx_buf_size = sizeof(sh->buf.rx);
209 sh->mqtt_cli.tx_buf = sh->buf.tx;
210 sh->mqtt_cli.tx_buf_size = sizeof(sh->buf.tx);
211
212 /* MQTT transport configuration */
213 sh->mqtt_cli.transport.type = MQTT_TRANSPORT_NON_SECURE;
214 }
215
216 /* Work routine to process MQTT packet and keep alive MQTT connection */
sh_mqtt_process_handler(struct k_work * work)217 static void sh_mqtt_process_handler(struct k_work *work)
218 {
219 ARG_UNUSED(work);
220 struct shell_mqtt *sh = sh_mqtt;
221 int rc;
222 int64_t remaining = LISTEN_TIMEOUT_MS;
223 int64_t start_time = k_uptime_get();
224
225 if (sh->network_state != SHELL_MQTT_NETWORK_CONNECTED) {
226 LOG_DBG("%s_work while %s", "process", "network disconnected");
227 return;
228 }
229
230 /* If context can't be locked, that means net conn cb locked it */
231 if (sh_mqtt_context_lock(K_NO_WAIT) != 0) {
232 /* In that case we should simply return */
233 LOG_DBG("%s_work unable to lock context", "process");
234 return;
235 }
236
237 if (sh->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
238 LOG_DBG("MQTT %s", "not connected");
239 goto process_error;
240 }
241
242 if (sh->subscribe_state != SHELL_MQTT_SUBSCRIBED) {
243 LOG_DBG("%s_work while %s", "process", "MQTT not subscribed");
244 goto process_error;
245 }
246
247 LOG_DBG("MQTT %s", "Processing");
248 /* Listen to the port for a duration defined by LISTEN_TIMEOUT_MS */
249 while ((remaining > 0) && (sh->network_state == SHELL_MQTT_NETWORK_CONNECTED) &&
250 (sh->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED) &&
251 (sh->subscribe_state == SHELL_MQTT_SUBSCRIBED)) {
252 LOG_DBG("Listening to socket");
253 rc = wait(sh, remaining);
254 if (rc > 0) {
255 LOG_DBG("Process socket for MQTT packet");
256 rc = mqtt_input(&sh->mqtt_cli);
257 if (rc != 0) {
258 LOG_ERR("%s error: %d", "processed: mqtt_input", rc);
259 goto process_error;
260 }
261 } else if (rc < 0) {
262 goto process_error;
263 }
264
265 LOG_DBG("MQTT %s", "Keepalive");
266 rc = mqtt_live(&sh->mqtt_cli);
267 if ((rc != 0) && (rc != -EAGAIN)) {
268 LOG_ERR("%s error: %d", "mqtt_live", rc);
269 goto process_error;
270 }
271
272 remaining = LISTEN_TIMEOUT_MS + start_time - k_uptime_get();
273 }
274
275 /* Reschedule the process work */
276 LOG_DBG("Scheduling %s work", "process");
277 (void)sh_mqtt_work_reschedule(&sh->process_dwork, PROCESS_INTERVAL);
278 sh_mqtt_context_unlock();
279 return;
280
281 process_error:
282 LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "connect");
283 sh_mqtt_close_and_cleanup(sh);
284 (void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
285 sh_mqtt_context_unlock();
286 }
287
sh_mqtt_subscribe_handler(struct k_work * work)288 static void sh_mqtt_subscribe_handler(struct k_work *work)
289 {
290 ARG_UNUSED(work);
291 struct shell_mqtt *sh = sh_mqtt;
292
293 /* Subscribe config information */
294 struct mqtt_topic subs_topic = { .topic = { .utf8 = sh->sub_topic,
295 .size = strlen(sh->sub_topic) },
296 .qos = MQTT_QOS_1_AT_LEAST_ONCE };
297 const struct mqtt_subscription_list subs_list = { .list = &subs_topic,
298 .list_count = 1U,
299 .message_id = 1U };
300 int rc;
301
302 if (sh->network_state != SHELL_MQTT_NETWORK_CONNECTED) {
303 LOG_DBG("%s_work while %s", "subscribe", "network disconnected");
304 return;
305 }
306
307 /* If context can't be locked, that means net conn cb locked it */
308 if (sh_mqtt_context_lock(K_NO_WAIT) != 0) {
309 /* In that case we should simply return */
310 LOG_DBG("%s_work unable to lock context", "subscribe");
311 return;
312 }
313
314 if (sh->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
315 LOG_DBG("%s_work while %s", "subscribe", "transport disconnected");
316 goto subscribe_error;
317 }
318
319 rc = mqtt_subscribe(&sh->mqtt_cli, &subs_list);
320 if (rc == 0) {
321 /* Wait for mqtt's connack */
322 LOG_DBG("Listening to socket");
323 rc = wait(sh, CONNECT_TIMEOUT_MS);
324 if (rc > 0) {
325 LOG_DBG("Process socket for MQTT packet");
326 rc = mqtt_input(&sh->mqtt_cli);
327 if (rc != 0) {
328 LOG_ERR("%s error: %d", "subscribe: mqtt_input", rc);
329 goto subscribe_error;
330 }
331 } else if (rc < 0) {
332 goto subscribe_error;
333 }
334
335 /* No suback, fail */
336 if (sh->subscribe_state != SHELL_MQTT_SUBSCRIBED) {
337 goto subscribe_error;
338 }
339
340 LOG_DBG("Scheduling MQTT process work");
341 (void)sh_mqtt_work_reschedule(&sh->process_dwork, PROCESS_INTERVAL);
342 sh_mqtt_context_unlock();
343
344 LOG_INF("Logs will be published to: %s", sh->pub_topic);
345 LOG_INF("Subscribing shell cmds from: %s", sh->sub_topic);
346
347 return;
348 }
349
350 subscribe_error:
351 LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "subscribe");
352 sh_mqtt_close_and_cleanup(sh);
353 (void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
354 sh_mqtt_context_unlock();
355 }
356
357 /* Work routine to connect to MQTT */
sh_mqtt_connect_handler(struct k_work * work)358 static void sh_mqtt_connect_handler(struct k_work *work)
359 {
360 ARG_UNUSED(work);
361 struct shell_mqtt *sh = sh_mqtt;
362 int rc;
363
364 if (sh->network_state != SHELL_MQTT_NETWORK_CONNECTED) {
365 LOG_DBG("%s_work while %s", "connect", "network disconnected");
366 return;
367 }
368
369 /* If context can't be locked, that means net conn cb locked it */
370 if (sh_mqtt_context_lock(K_NO_WAIT) != 0) {
371 /* In that case we should simply return */
372 LOG_DBG("%s_work unable to lock context", "connect");
373 return;
374 }
375
376 if (sh->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED) {
377 __ASSERT(0, "MQTT shouldn't be already connected");
378 LOG_ERR("MQTT shouldn't be already connected");
379 goto connect_error;
380 }
381
382 /* Resolve the broker URL */
383 LOG_DBG("Resolving DNS");
384 rc = get_mqtt_broker_addrinfo(sh);
385 if (rc != 0) {
386 (void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
387 sh_mqtt_context_unlock();
388 return;
389 }
390
391 LOG_DBG("Initializing MQTT client");
392 broker_init(sh);
393 client_init(sh);
394
395 /* Try to connect to mqtt */
396 LOG_DBG("Connecting to MQTT broker");
397 rc = mqtt_connect(&sh->mqtt_cli);
398 if (rc != 0) {
399 LOG_ERR("%s error: %d", "mqtt_connect", rc);
400 goto connect_error;
401 }
402
403 /* Prepare port config */
404 LOG_DBG("Preparing socket");
405 prepare_fds(sh);
406
407 /* Wait for mqtt's connack */
408 LOG_DBG("Listening to socket");
409 rc = wait(sh, CONNECT_TIMEOUT_MS);
410 if (rc > 0) {
411 LOG_DBG("Process socket for MQTT packet");
412 rc = mqtt_input(&sh->mqtt_cli);
413 if (rc != 0) {
414 LOG_ERR("%s error: %d", "connect: mqtt_input", rc);
415 goto connect_error;
416 }
417 } else if (rc < 0) {
418 goto connect_error;
419 }
420
421 /* No connack, fail */
422 if (sh->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
423 goto connect_error;
424 }
425
426 LOG_DBG("Scheduling %s work", "subscribe");
427 (void)sh_mqtt_work_reschedule(&sh->subscribe_dwork, PROCESS_INTERVAL);
428 sh_mqtt_context_unlock();
429 return;
430
431 connect_error:
432 LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "connect");
433 sh_mqtt_close_and_cleanup(sh);
434 (void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
435 sh_mqtt_context_unlock();
436 }
437
sh_mqtt_publish(struct shell_mqtt * sh,uint8_t * data,uint32_t len)438 static int sh_mqtt_publish(struct shell_mqtt *sh, uint8_t *data, uint32_t len)
439 {
440 sh->pub_data.message.payload.data = data;
441 sh->pub_data.message.payload.len = len;
442 sh->pub_data.message_id++;
443
444 return mqtt_publish(&sh->mqtt_cli, &sh->pub_data);
445 }
446
sh_mqtt_publish_tx_buf(struct shell_mqtt * sh,bool is_work)447 static int sh_mqtt_publish_tx_buf(struct shell_mqtt *sh, bool is_work)
448 {
449 int rc;
450
451 rc = sh_mqtt_publish(sh, &sh->tx_buf.buf[0], sh->tx_buf.len);
452 memset(&sh->tx_buf, 0, sizeof(sh->tx_buf));
453 if (rc != 0) {
454 LOG_ERR("MQTT publish error: %d", rc);
455 return rc;
456 }
457
458 /* Arbitrary delay to not kill the session */
459 if (!is_work) {
460 k_sleep(MQTT_SEND_DELAY_MS);
461 }
462
463 return rc;
464 }
465
sh_mqtt_publish_handler(struct k_work * work)466 static void sh_mqtt_publish_handler(struct k_work *work)
467 {
468 ARG_UNUSED(work);
469 struct shell_mqtt *sh = sh_mqtt;
470 int rc;
471
472 (void)sh_mqtt_context_lock(K_FOREVER);
473
474 rc = sh_mqtt_publish_tx_buf(sh, true);
475 if (rc != 0) {
476 LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "publish");
477 sh_mqtt_close_and_cleanup(sh);
478 (void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
479 }
480
481 sh_mqtt_context_unlock();
482 }
483
cancel_dworks_and_cleanup(struct shell_mqtt * sh)484 static void cancel_dworks_and_cleanup(struct shell_mqtt *sh)
485 {
486 (void)k_work_cancel_delayable(&sh->connect_dwork);
487 (void)k_work_cancel_delayable(&sh->subscribe_dwork);
488 (void)k_work_cancel_delayable(&sh->process_dwork);
489 (void)k_work_cancel_delayable(&sh->publish_dwork);
490 sh_mqtt_close_and_cleanup(sh);
491 }
492
net_disconnect_handler(struct k_work * work)493 static void net_disconnect_handler(struct k_work *work)
494 {
495 ARG_UNUSED(work);
496 struct shell_mqtt *sh = sh_mqtt;
497
498 LOG_WRN("Network %s", "disconnected");
499 sh->network_state = SHELL_MQTT_NETWORK_DISCONNECTED;
500
501 /* Stop all possible work */
502 (void)sh_mqtt_context_lock(K_FOREVER);
503 cancel_dworks_and_cleanup(sh);
504 sh_mqtt_context_unlock();
505 /* If the transport was requested, the connect work will be rescheduled
506 * when internet is connected again
507 */
508 }
509
510 /* Network connection event handler */
network_evt_handler(struct net_mgmt_event_callback * cb,uint64_t mgmt_event,struct net_if * iface)511 static void network_evt_handler(struct net_mgmt_event_callback *cb, uint64_t mgmt_event,
512 struct net_if *iface)
513 {
514 struct shell_mqtt *sh = sh_mqtt;
515
516 if ((mgmt_event == NET_EVENT_L4_CONNECTED) &&
517 (sh->network_state == SHELL_MQTT_NETWORK_DISCONNECTED)) {
518 LOG_WRN("Network %s", "connected");
519 sh->network_state = SHELL_MQTT_NETWORK_CONNECTED;
520 (void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
521 } else if ((mgmt_event == NET_EVENT_L4_DISCONNECTED) &&
522 (sh->network_state == SHELL_MQTT_NETWORK_CONNECTED)) {
523 (void)sh_mqtt_work_submit(&sh->net_disconnected_work);
524 }
525 }
526
mqtt_evt_handler(struct mqtt_client * const client,const struct mqtt_evt * evt)527 static void mqtt_evt_handler(struct mqtt_client *const client, const struct mqtt_evt *evt)
528 {
529 struct shell_mqtt *sh = sh_mqtt;
530
531 switch (evt->type) {
532 case MQTT_EVT_CONNACK:
533 if (evt->result != 0) {
534 sh->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED;
535 LOG_ERR("MQTT %s %d", "connect failed", evt->result);
536 break;
537 }
538
539 sh->transport_state = SHELL_MQTT_TRANSPORT_CONNECTED;
540 LOG_WRN("MQTT %s", "client connected!");
541 break;
542
543 case MQTT_EVT_SUBACK:
544 if (evt->result != 0) {
545 LOG_ERR("MQTT subscribe: %s", "error");
546 sh->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
547 break;
548 }
549
550 LOG_WRN("MQTT subscribe: %s", "ok");
551 sh->subscribe_state = SHELL_MQTT_SUBSCRIBED;
552 break;
553
554 case MQTT_EVT_UNSUBACK:
555 LOG_DBG("UNSUBACK packet id: %u", evt->param.suback.message_id);
556 sh->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
557 break;
558
559 case MQTT_EVT_DISCONNECT:
560 LOG_WRN("MQTT disconnected: %d", evt->result);
561 sh->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED;
562 sh->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
563 break;
564
565 case MQTT_EVT_PUBLISH: {
566 const struct mqtt_publish_param *pub = &evt->param.publish;
567 uint32_t payload_left;
568 size_t size;
569 int rc;
570
571 payload_left = pub->message.payload.len;
572
573 LOG_DBG("MQTT publish received %d, %d bytes", evt->result, payload_left);
574 LOG_DBG(" id: %d, qos: %d", pub->message_id, pub->message.topic.qos);
575 LOG_DBG(" item: %s", pub->message.topic.topic.utf8);
576
577 /* For MQTT_QOS_0_AT_MOST_ONCE no acknowledgment needed */
578 if (pub->message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
579 struct mqtt_puback_param puback = { .message_id = pub->message_id };
580
581 (void)mqtt_publish_qos1_ack(client, &puback);
582 }
583
584 while (payload_left > 0) {
585 /* Attempt to claim `payload_left` bytes of buffer in rb */
586 size = (size_t)ring_buf_put_claim(&sh->rx_rb, &sh->rx_rb_ptr,
587 payload_left);
588 /* Read `size` bytes of payload from mqtt */
589 rc = mqtt_read_publish_payload_blocking(client, sh->rx_rb_ptr, size);
590
591 /* errno value, return */
592 if (rc < 0) {
593 ring_buf_reset(&sh->rx_rb);
594 return;
595 }
596
597 size = (size_t)rc;
598 /* Indicate that `size` bytes of payload has been written into rb */
599 (void)ring_buf_put_finish(&sh->rx_rb, size);
600 /* Update `payload_left` */
601 payload_left -= size;
602 /* Tells the shell that we have new data for it */
603 sh->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh->shell_context);
604 /* Arbitrary sleep for the shell to do its thing */
605 (void)k_msleep(100);
606 }
607
608 /* Shell won't execute the cmds without \r\n */
609 while (true) {
610 /* Check if rb's free space is enough to fit in \r\n */
611 size = ring_buf_space_get(&sh->rx_rb);
612 if (size >= sizeof("\r\n")) {
613 (void)ring_buf_put(&sh->rx_rb, "\r\n", sizeof("\r\n"));
614 break;
615 }
616 /* Arbitrary sleep for the shell to do its thing */
617 (void)k_msleep(100);
618 }
619
620 sh->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh->shell_context);
621 break;
622 }
623
624 case MQTT_EVT_PUBACK:
625 if (evt->result != 0) {
626 LOG_ERR("MQTT PUBACK error %d", evt->result);
627 break;
628 }
629
630 LOG_DBG("PUBACK packet id: %u", evt->param.puback.message_id);
631 break;
632
633 case MQTT_EVT_PINGRESP:
634 LOG_DBG("PINGRESP packet");
635 break;
636
637 default:
638 LOG_DBG("MQTT event received %d", evt->type);
639 break;
640 }
641 }
642
init(const struct shell_transport * transport,const void * config,shell_transport_handler_t evt_handler,void * context)643 static int init(const struct shell_transport *transport, const void *config,
644 shell_transport_handler_t evt_handler, void *context)
645 {
646 sh_mqtt = (struct shell_mqtt *)transport->ctx;
647 struct shell_mqtt *sh = sh_mqtt;
648
649 (void)memset(sh, 0, sizeof(struct shell_mqtt));
650
651 (void)k_mutex_init(&sh->lock);
652
653 if (!shell_mqtt_get_devid(sh->device_id, DEVICE_ID_HEX_MAX_SIZE)) {
654 LOG_ERR("Unable to get device identity, using dummy value");
655 (void)snprintf(sh->device_id, sizeof("dummy"), "dummy");
656 }
657
658 LOG_DBG("Client ID is %s", sh->device_id);
659
660 (void)snprintf(sh->pub_topic, SH_MQTT_TOPIC_MAX_SIZE, "%s_tx", sh->device_id);
661 (void)snprintf(sh->sub_topic, SH_MQTT_TOPIC_MAX_SIZE, "%s_rx", sh->device_id);
662
663 ring_buf_init(&sh->rx_rb, RX_RB_SIZE, sh->rx_rb_buf);
664
665 LOG_DBG("Initializing shell MQTT backend");
666
667 sh->shell_handler = evt_handler;
668 sh->shell_context = context;
669
670 sh->pub_data.message.topic.qos = MQTT_QOS_0_AT_MOST_ONCE;
671 sh->pub_data.message.topic.topic.utf8 = (uint8_t *)sh->pub_topic;
672 sh->pub_data.message.topic.topic.size =
673 strlen(sh->pub_data.message.topic.topic.utf8);
674 sh->pub_data.dup_flag = 0U;
675 sh->pub_data.retain_flag = 0U;
676
677 /* Initialize the work queue */
678 k_work_queue_init(&sh->workq);
679 k_work_queue_start(&sh->workq, sh_mqtt_workq_stack,
680 K_KERNEL_STACK_SIZEOF(sh_mqtt_workq_stack), K_PRIO_COOP(7), NULL);
681 (void)k_thread_name_set(&sh->workq.thread, "sh_mqtt_workq");
682 k_work_init(&sh->net_disconnected_work, net_disconnect_handler);
683 k_work_init_delayable(&sh->connect_dwork, sh_mqtt_connect_handler);
684 k_work_init_delayable(&sh->subscribe_dwork, sh_mqtt_subscribe_handler);
685 k_work_init_delayable(&sh->process_dwork, sh_mqtt_process_handler);
686 k_work_init_delayable(&sh->publish_dwork, sh_mqtt_publish_handler);
687
688 LOG_DBG("Initializing listener for network");
689 net_mgmt_init_event_callback(&sh->mgmt_cb, network_evt_handler, NET_EVENT_MASK);
690
691 sh->network_state = SHELL_MQTT_NETWORK_DISCONNECTED;
692 sh->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED;
693 sh->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
694
695 return 0;
696 }
697
uninit(const struct shell_transport * transport)698 static int uninit(const struct shell_transport *transport)
699 {
700 ARG_UNUSED(transport);
701 struct shell_mqtt *sh = sh_mqtt;
702
703 /* Not initialized yet */
704 if (sh == NULL) {
705 return -ENODEV;
706 }
707
708 return 0;
709 }
710
enable(const struct shell_transport * transport,bool blocking)711 static int enable(const struct shell_transport *transport, bool blocking)
712 {
713 ARG_UNUSED(transport);
714 ARG_UNUSED(blocking);
715 struct shell_mqtt *sh = sh_mqtt;
716
717 /* Not initialized yet */
718 if (sh == NULL) {
719 return -ENODEV;
720 }
721
722 /* Listen for network connection status */
723 net_mgmt_add_event_callback(&sh->mgmt_cb);
724 conn_mgr_mon_resend_status();
725
726 return 0;
727 }
728
write_data(const struct shell_transport * transport,const void * data,size_t length,size_t * cnt)729 static int write_data(const struct shell_transport *transport, const void *data, size_t length,
730 size_t *cnt)
731 {
732 ARG_UNUSED(transport);
733 struct shell_mqtt *sh = sh_mqtt;
734 int rc = 0;
735 struct k_work_sync ws;
736 size_t copy_len;
737
738 *cnt = 0;
739
740 /* Not initialized yet */
741 if (sh == NULL) {
742 return -ENODEV;
743 }
744
745 /* Not connected to broker */
746 if (sh->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
747 goto out;
748 }
749
750 (void)k_work_cancel_delayable_sync(&sh->publish_dwork, &ws);
751
752 do {
753 if ((sh->tx_buf.len + length - *cnt) > TX_BUF_SIZE) {
754 copy_len = TX_BUF_SIZE - sh->tx_buf.len;
755 } else {
756 copy_len = length - *cnt;
757 }
758
759 memcpy(sh->tx_buf.buf + sh->tx_buf.len, (uint8_t *)data + *cnt, copy_len);
760 sh->tx_buf.len += copy_len;
761
762 /* Send the data immediately if the buffer is full */
763 if (sh->tx_buf.len == TX_BUF_SIZE) {
764 rc = sh_mqtt_publish_tx_buf(sh, false);
765 if (rc != 0) {
766 sh_mqtt_close_and_cleanup(sh);
767 (void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
768 *cnt = length;
769 return rc;
770 }
771 }
772
773 *cnt += copy_len;
774 } while (*cnt < length);
775
776 if (sh->tx_buf.len > 0) {
777 (void)sh_mqtt_work_reschedule(&sh->publish_dwork, MQTT_SEND_DELAY_MS);
778 }
779
780 /* Inform shell that it is ready for next TX */
781 sh->shell_handler(SHELL_TRANSPORT_EVT_TX_RDY, sh->shell_context);
782
783 out:
784 /* We will always assume that we sent everything */
785 *cnt = length;
786 return rc;
787 }
788
read_data(const struct shell_transport * transport,void * data,size_t length,size_t * cnt)789 static int read_data(const struct shell_transport *transport, void *data, size_t length,
790 size_t *cnt)
791 {
792 ARG_UNUSED(transport);
793 struct shell_mqtt *sh = sh_mqtt;
794
795 /* Not initialized yet */
796 if (sh == NULL) {
797 return -ENODEV;
798 }
799
800 /* Not subscribed yet */
801 if (sh->subscribe_state != SHELL_MQTT_SUBSCRIBED) {
802 *cnt = 0;
803 return 0;
804 }
805
806 *cnt = ring_buf_get(&sh->rx_rb, data, length);
807
808 /* Inform the shell if there are still data in the rb */
809 if (ring_buf_size_get(&sh->rx_rb) > 0) {
810 sh->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh->shell_context);
811 }
812
813 return 0;
814 }
815
816 const struct shell_transport_api shell_mqtt_transport_api = { .init = init,
817 .uninit = uninit,
818 .enable = enable,
819 .write = write_data,
820 .read = read_data };
821
enable_shell_mqtt(void)822 static int enable_shell_mqtt(void)
823 {
824
825 bool log_backend = CONFIG_SHELL_MQTT_INIT_LOG_LEVEL > 0;
826 uint32_t level = (CONFIG_SHELL_MQTT_INIT_LOG_LEVEL > LOG_LEVEL_DBG) ?
827 CONFIG_LOG_MAX_LEVEL :
828 CONFIG_SHELL_MQTT_INIT_LOG_LEVEL;
829 static const struct shell_backend_config_flags cfg_flags = {
830 .insert_mode = 0,
831 .echo = 0,
832 .obscure = 0,
833 .mode_delete = 0,
834 .use_colors = 0,
835 .use_vt100 = 0,
836 };
837
838 return shell_init(&shell_mqtt, NULL, cfg_flags, log_backend, level);
839 }
840
841 /* Function is used for testing purposes */
shell_backend_mqtt_get_ptr(void)842 const struct shell *shell_backend_mqtt_get_ptr(void)
843 {
844 return &shell_mqtt;
845 }
846
847 SYS_INIT(enable_shell_mqtt, APPLICATION, CONFIG_APPLICATION_INIT_PRIORITY);
848