1 /*
2 * Copyright (c) 2018 Nordic Semiconductor ASA
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 /** @file mqtt.c
8 *
9 * @brief MQTT Client API Implementation.
10 */
11
12 #include <zephyr/logging/log.h>
13 LOG_MODULE_REGISTER(net_mqtt, CONFIG_MQTT_LOG_LEVEL);
14
15 #include <zephyr/net/mqtt.h>
16
17 #include "mqtt_transport.h"
18 #include "mqtt_internal.h"
19 #include "mqtt_os.h"
20
client_reset(struct mqtt_client * client)21 static void client_reset(struct mqtt_client *client)
22 {
23 MQTT_STATE_INIT(client);
24
25 client->internal.last_activity = 0U;
26 client->internal.rx_buf_datalen = 0U;
27 client->internal.remaining_payload = 0U;
28 }
29
30 /** @brief Initialize tx buffer. */
tx_buf_init(struct mqtt_client * client,struct buf_ctx * buf)31 static void tx_buf_init(struct mqtt_client *client, struct buf_ctx *buf)
32 {
33 memset(client->tx_buf, 0, client->tx_buf_size);
34 buf->cur = client->tx_buf;
35 buf->end = client->tx_buf + client->tx_buf_size;
36 }
37
event_notify(struct mqtt_client * client,const struct mqtt_evt * evt)38 void event_notify(struct mqtt_client *client, const struct mqtt_evt *evt)
39 {
40 if (client->evt_cb != NULL) {
41 mqtt_mutex_unlock(client);
42
43 client->evt_cb(client, evt);
44
45 mqtt_mutex_lock(client);
46 }
47 }
48
mqtt_client_disconnect(struct mqtt_client * client,int result,bool notify)49 void mqtt_client_disconnect(struct mqtt_client *client, int result, bool notify)
50 {
51 int err_code;
52
53 err_code = mqtt_transport_disconnect(client);
54 if (err_code < 0) {
55 NET_ERR("Failed to disconnect transport!");
56 }
57
58 /* Reset internal state. */
59 client_reset(client);
60
61 if (notify) {
62 struct mqtt_evt evt = {
63 .type = MQTT_EVT_DISCONNECT,
64 .result = result,
65 };
66
67 /* Notify application. */
68 event_notify(client, &evt);
69 }
70 }
71
client_connect(struct mqtt_client * client)72 static int client_connect(struct mqtt_client *client)
73 {
74 int err_code;
75 struct buf_ctx packet;
76
77 err_code = mqtt_transport_connect(client);
78 if (err_code < 0) {
79 return err_code;
80 }
81
82 tx_buf_init(client, &packet);
83 MQTT_SET_STATE(client, MQTT_STATE_TCP_CONNECTED);
84
85 err_code = connect_request_encode(client, &packet);
86 if (err_code < 0) {
87 goto error;
88 }
89
90 /* Send MQTT identification message to broker. */
91 err_code = mqtt_transport_write(client, packet.cur,
92 packet.end - packet.cur);
93 if (err_code < 0) {
94 goto error;
95 }
96
97 client->internal.last_activity = mqtt_sys_tick_in_ms_get();
98
99 /* Reset the unanswered ping count for a new connection */
100 client->unacked_ping = 0;
101
102 NET_INFO("Connect completed");
103
104 return 0;
105
106 error:
107 mqtt_client_disconnect(client, err_code, false);
108 return err_code;
109 }
110
111 static int client_write(struct mqtt_client *client, const uint8_t *data,
112 uint32_t datalen);
113
114 #if defined(CONFIG_MQTT_VERSION_5_0)
disconnect_5_0_notify(struct mqtt_client * client,int err)115 static void disconnect_5_0_notify(struct mqtt_client *client, int err)
116 {
117 struct mqtt_disconnect_param param = { };
118 struct buf_ctx packet;
119
120 /* Parser might've set custom failure reason, in such case skip generic
121 * mapping from errno to reason code.
122 */
123 if (client->internal.disconnect_reason != MQTT_DISCONNECT_NORMAL) {
124 param.reason_code = client->internal.disconnect_reason;
125 } else {
126 switch (err) {
127 case ECONNREFUSED:
128 case ENOTCONN:
129 /* Connection rejected/closed, skip disconnect. */
130 return;
131 case EINVAL:
132 param.reason_code = MQTT_DISCONNECT_MALFORMED_PACKET;
133 break;
134 case EBADMSG:
135 param.reason_code = MQTT_DISCONNECT_PROTOCOL_ERROR;
136 break;
137 case ENOMEM:
138 param.reason_code = MQTT_DISCONNECT_PACKET_TOO_LARGE;
139 break;
140 default:
141 param.reason_code = MQTT_DISCONNECT_UNSPECIFIED_ERROR;
142 break;
143 }
144 }
145
146 tx_buf_init(client, &packet);
147
148 if (disconnect_encode(client, ¶m, &packet) < 0) {
149 return;
150 }
151
152 (void)client_write(client, packet.cur, packet.end - packet.cur);
153 }
154 #else
disconnect_5_0_notify(struct mqtt_client * client,int err)155 static void disconnect_5_0_notify(struct mqtt_client *client, int err)
156 {
157 ARG_UNUSED(client);
158 ARG_UNUSED(err);
159 }
160 #endif /* CONFIG_MQTT_VERSION_5_0 */
161
162
client_read(struct mqtt_client * client)163 static int client_read(struct mqtt_client *client)
164 {
165 int err_code;
166
167 if (client->internal.remaining_payload > 0) {
168 return -EBUSY;
169 }
170
171 err_code = mqtt_handle_rx(client);
172 if (err_code < 0) {
173 if (mqtt_is_version_5_0(client)) {
174 /* Best effort send, if it fails just shut the connection. */
175 disconnect_5_0_notify(client, -err_code);
176 }
177
178 mqtt_client_disconnect(client, err_code, true);
179 }
180
181 return err_code;
182 }
183
client_write(struct mqtt_client * client,const uint8_t * data,uint32_t datalen)184 static int client_write(struct mqtt_client *client, const uint8_t *data,
185 uint32_t datalen)
186 {
187 int err_code;
188
189 NET_DBG("[%p]: Transport writing %d bytes.", client, datalen);
190
191 err_code = mqtt_transport_write(client, data, datalen);
192 if (err_code < 0) {
193 NET_ERR("Transport write failed, err_code = %d, "
194 "closing connection", err_code);
195 mqtt_client_disconnect(client, err_code, true);
196 return err_code;
197 }
198
199 NET_DBG("[%p]: Transport write complete.", client);
200 client->internal.last_activity = mqtt_sys_tick_in_ms_get();
201
202 return 0;
203 }
204
client_write_msg(struct mqtt_client * client,const struct msghdr * message)205 static int client_write_msg(struct mqtt_client *client,
206 const struct msghdr *message)
207 {
208 int err_code;
209
210 NET_DBG("[%p]: Transport writing message.", client);
211
212 err_code = mqtt_transport_write_msg(client, message);
213 if (err_code < 0) {
214 NET_ERR("Transport write failed, err_code = %d, "
215 "closing connection", err_code);
216 mqtt_client_disconnect(client, err_code, true);
217 return err_code;
218 }
219
220 NET_DBG("[%p]: Transport write complete.", client);
221 client->internal.last_activity = mqtt_sys_tick_in_ms_get();
222
223 return 0;
224 }
225
mqtt_client_init(struct mqtt_client * client)226 void mqtt_client_init(struct mqtt_client *client)
227 {
228 NULL_PARAM_CHECK_VOID(client);
229
230 memset(client, 0, sizeof(*client));
231
232 MQTT_STATE_INIT(client);
233 mqtt_mutex_init(client);
234
235 client->protocol_version = IS_ENABLED(CONFIG_MQTT_VERSION_5_0) ?
236 MQTT_VERSION_5_0 : MQTT_VERSION_3_1_1;
237 client->clean_session = MQTT_CLEAN_SESSION;
238 client->keepalive = MQTT_KEEPALIVE;
239 }
240
241 #if defined(CONFIG_SOCKS)
mqtt_client_set_proxy(struct mqtt_client * client,struct sockaddr * proxy_addr,socklen_t addrlen)242 int mqtt_client_set_proxy(struct mqtt_client *client,
243 struct sockaddr *proxy_addr,
244 socklen_t addrlen)
245 {
246 if (IS_ENABLED(CONFIG_SOCKS)) {
247 if (!client || !proxy_addr) {
248 return -EINVAL;
249 }
250
251 client->transport.proxy.addrlen = addrlen;
252 memcpy(&client->transport.proxy.addr, proxy_addr, addrlen);
253
254 return 0;
255 }
256
257 return -ENOTSUP;
258 }
259 #endif
260
mqtt_connect(struct mqtt_client * client)261 int mqtt_connect(struct mqtt_client *client)
262 {
263 int err_code;
264
265 NULL_PARAM_CHECK(client);
266 NULL_PARAM_CHECK(client->client_id.utf8);
267
268 mqtt_mutex_lock(client);
269
270 if ((client->tx_buf == NULL) || (client->rx_buf == NULL)) {
271 err_code = -ENOMEM;
272 goto error;
273 }
274
275 err_code = client_connect(client);
276
277 error:
278 if (err_code < 0) {
279 client_reset(client);
280 }
281
282 mqtt_mutex_unlock(client);
283
284 return err_code;
285 }
286
verify_tx_state(const struct mqtt_client * client)287 static int verify_tx_state(const struct mqtt_client *client)
288 {
289 if (!MQTT_HAS_STATE(client, MQTT_STATE_CONNECTED)) {
290 return -ENOTCONN;
291 }
292
293 return 0;
294 }
295
mqtt_publish(struct mqtt_client * client,const struct mqtt_publish_param * param)296 int mqtt_publish(struct mqtt_client *client,
297 const struct mqtt_publish_param *param)
298 {
299 int err_code;
300 struct buf_ctx packet;
301 struct iovec io_vector[2];
302 struct msghdr msg;
303
304 NULL_PARAM_CHECK(client);
305 NULL_PARAM_CHECK(param);
306
307 NET_DBG("[CID %p]:[State 0x%02x]: >> Topic size 0x%08x, "
308 "Data size 0x%08x", client, client->internal.state,
309 param->message.topic.topic.size,
310 param->message.payload.len);
311
312 mqtt_mutex_lock(client);
313
314 tx_buf_init(client, &packet);
315
316 err_code = verify_tx_state(client);
317 if (err_code < 0) {
318 goto error;
319 }
320
321 err_code = publish_encode(client, param, &packet);
322 if (err_code < 0) {
323 goto error;
324 }
325
326 io_vector[0].iov_base = packet.cur;
327 io_vector[0].iov_len = packet.end - packet.cur;
328 io_vector[1].iov_base = param->message.payload.data;
329 io_vector[1].iov_len = param->message.payload.len;
330
331 memset(&msg, 0, sizeof(msg));
332
333 msg.msg_iov = io_vector;
334 msg.msg_iovlen = ARRAY_SIZE(io_vector);
335
336 err_code = client_write_msg(client, &msg);
337
338 error:
339 NET_DBG("[CID %p]:[State 0x%02x]: << result 0x%08x",
340 client, client->internal.state, err_code);
341
342 mqtt_mutex_unlock(client);
343
344 return err_code;
345 }
346
mqtt_publish_qos1_ack(struct mqtt_client * client,const struct mqtt_puback_param * param)347 int mqtt_publish_qos1_ack(struct mqtt_client *client,
348 const struct mqtt_puback_param *param)
349 {
350 int err_code;
351 struct buf_ctx packet;
352
353 NULL_PARAM_CHECK(client);
354 NULL_PARAM_CHECK(param);
355
356 NET_DBG("[CID %p]:[State 0x%02x]: >> Message id 0x%04x",
357 client, client->internal.state, param->message_id);
358
359 mqtt_mutex_lock(client);
360
361 tx_buf_init(client, &packet);
362
363 err_code = verify_tx_state(client);
364 if (err_code < 0) {
365 goto error;
366 }
367
368 err_code = publish_ack_encode(client, param, &packet);
369 if (err_code < 0) {
370 goto error;
371 }
372
373 err_code = client_write(client, packet.cur, packet.end - packet.cur);
374
375 error:
376 NET_DBG("[CID %p]:[State 0x%02x]: << result 0x%08x",
377 client, client->internal.state, err_code);
378
379 mqtt_mutex_unlock(client);
380
381 return err_code;
382 }
383
mqtt_publish_qos2_receive(struct mqtt_client * client,const struct mqtt_pubrec_param * param)384 int mqtt_publish_qos2_receive(struct mqtt_client *client,
385 const struct mqtt_pubrec_param *param)
386 {
387 int err_code;
388 struct buf_ctx packet;
389
390 NULL_PARAM_CHECK(client);
391 NULL_PARAM_CHECK(param);
392
393 NET_DBG("[CID %p]:[State 0x%02x]: >> Message id 0x%04x",
394 client, client->internal.state, param->message_id);
395
396 mqtt_mutex_lock(client);
397
398 tx_buf_init(client, &packet);
399
400 err_code = verify_tx_state(client);
401 if (err_code < 0) {
402 goto error;
403 }
404
405 err_code = publish_receive_encode(client, param, &packet);
406 if (err_code < 0) {
407 goto error;
408 }
409
410 err_code = client_write(client, packet.cur, packet.end - packet.cur);
411
412 error:
413 NET_DBG("[CID %p]:[State 0x%02x]: << result 0x%08x",
414 client, client->internal.state, err_code);
415
416 mqtt_mutex_unlock(client);
417
418 return err_code;
419 }
420
mqtt_publish_qos2_release(struct mqtt_client * client,const struct mqtt_pubrel_param * param)421 int mqtt_publish_qos2_release(struct mqtt_client *client,
422 const struct mqtt_pubrel_param *param)
423 {
424 int err_code;
425 struct buf_ctx packet;
426
427 NULL_PARAM_CHECK(client);
428 NULL_PARAM_CHECK(param);
429
430 NET_DBG("[CID %p]:[State 0x%02x]: >> Message id 0x%04x",
431 client, client->internal.state, param->message_id);
432
433 mqtt_mutex_lock(client);
434
435 tx_buf_init(client, &packet);
436
437 err_code = verify_tx_state(client);
438 if (err_code < 0) {
439 goto error;
440 }
441
442 err_code = publish_release_encode(client, param, &packet);
443 if (err_code < 0) {
444 goto error;
445 }
446
447 err_code = client_write(client, packet.cur, packet.end - packet.cur);
448
449 error:
450 NET_DBG("[CID %p]:[State 0x%02x]: << result 0x%08x",
451 client, client->internal.state, err_code);
452
453 mqtt_mutex_unlock(client);
454
455 return err_code;
456 }
457
mqtt_publish_qos2_complete(struct mqtt_client * client,const struct mqtt_pubcomp_param * param)458 int mqtt_publish_qos2_complete(struct mqtt_client *client,
459 const struct mqtt_pubcomp_param *param)
460 {
461 int err_code;
462 struct buf_ctx packet;
463
464 NULL_PARAM_CHECK(client);
465 NULL_PARAM_CHECK(param);
466
467 NET_DBG("[CID %p]:[State 0x%02x]: >> Message id 0x%04x",
468 client, client->internal.state, param->message_id);
469
470 mqtt_mutex_lock(client);
471
472 tx_buf_init(client, &packet);
473
474 err_code = verify_tx_state(client);
475 if (err_code < 0) {
476 goto error;
477 }
478
479 err_code = publish_complete_encode(client, param, &packet);
480 if (err_code < 0) {
481 goto error;
482 }
483
484 err_code = client_write(client, packet.cur, packet.end - packet.cur);
485 if (err_code < 0) {
486 goto error;
487 }
488
489 error:
490 NET_DBG("[CID %p]:[State 0x%02x]: << result 0x%08x",
491 client, client->internal.state, err_code);
492
493 mqtt_mutex_unlock(client);
494
495 return err_code;
496 }
497
mqtt_disconnect(struct mqtt_client * client,const struct mqtt_disconnect_param * param)498 int mqtt_disconnect(struct mqtt_client *client,
499 const struct mqtt_disconnect_param *param)
500 {
501 int err_code;
502 struct buf_ctx packet;
503
504 NULL_PARAM_CHECK(client);
505
506 mqtt_mutex_lock(client);
507
508 tx_buf_init(client, &packet);
509
510 err_code = verify_tx_state(client);
511 if (err_code < 0) {
512 goto error;
513 }
514
515 err_code = disconnect_encode(client, param, &packet);
516 if (err_code < 0) {
517 goto error;
518 }
519
520 err_code = client_write(client, packet.cur, packet.end - packet.cur);
521 if (err_code < 0) {
522 goto error;
523 }
524
525 mqtt_client_disconnect(client, 0, true);
526
527 error:
528 mqtt_mutex_unlock(client);
529
530 return err_code;
531 }
532
mqtt_subscribe(struct mqtt_client * client,const struct mqtt_subscription_list * param)533 int mqtt_subscribe(struct mqtt_client *client,
534 const struct mqtt_subscription_list *param)
535 {
536 int err_code;
537 struct buf_ctx packet;
538
539 NULL_PARAM_CHECK(client);
540 NULL_PARAM_CHECK(param);
541
542 NET_DBG("[CID %p]:[State 0x%02x]: >> message id 0x%04x "
543 "topic count 0x%04x", client, client->internal.state,
544 param->message_id, param->list_count);
545
546 mqtt_mutex_lock(client);
547
548 tx_buf_init(client, &packet);
549
550 err_code = verify_tx_state(client);
551 if (err_code < 0) {
552 goto error;
553 }
554
555 err_code = subscribe_encode(client, param, &packet);
556 if (err_code < 0) {
557 goto error;
558 }
559
560 err_code = client_write(client, packet.cur, packet.end - packet.cur);
561
562 error:
563 NET_DBG("[CID %p]:[State 0x%02x]: << result 0x%08x",
564 client, client->internal.state, err_code);
565
566 mqtt_mutex_unlock(client);
567
568 return err_code;
569 }
570
mqtt_unsubscribe(struct mqtt_client * client,const struct mqtt_subscription_list * param)571 int mqtt_unsubscribe(struct mqtt_client *client,
572 const struct mqtt_subscription_list *param)
573 {
574 int err_code;
575 struct buf_ctx packet;
576
577 NULL_PARAM_CHECK(client);
578 NULL_PARAM_CHECK(param);
579
580 mqtt_mutex_lock(client);
581
582 tx_buf_init(client, &packet);
583
584 err_code = verify_tx_state(client);
585 if (err_code < 0) {
586 goto error;
587 }
588
589 err_code = unsubscribe_encode(client, param, &packet);
590 if (err_code < 0) {
591 goto error;
592 }
593
594 err_code = client_write(client, packet.cur, packet.end - packet.cur);
595
596 error:
597 mqtt_mutex_unlock(client);
598
599 return err_code;
600 }
601
mqtt_ping(struct mqtt_client * client)602 int mqtt_ping(struct mqtt_client *client)
603 {
604 int err_code;
605 struct buf_ctx packet;
606
607 NULL_PARAM_CHECK(client);
608
609 mqtt_mutex_lock(client);
610
611 tx_buf_init(client, &packet);
612
613 err_code = verify_tx_state(client);
614 if (err_code < 0) {
615 goto error;
616 }
617
618 err_code = ping_request_encode(&packet);
619 if (err_code < 0) {
620 goto error;
621 }
622
623 err_code = client_write(client, packet.cur, packet.end - packet.cur);
624
625 if (client->unacked_ping >= INT8_MAX) {
626 NET_WARN("PING count overflow!");
627 } else {
628 client->unacked_ping++;
629 }
630
631 error:
632 mqtt_mutex_unlock(client);
633
634 return err_code;
635 }
636
637 #if defined(CONFIG_MQTT_VERSION_5_0)
verify_auth_state(const struct mqtt_client * client)638 static int verify_auth_state(const struct mqtt_client *client)
639 {
640 /* Enhanced authentication is only allowed when connecting at MQTT
641 * level (before CONNACK is received).
642 */
643 if (MQTT_HAS_STATE(client, MQTT_STATE_TCP_CONNECTED) &&
644 !MQTT_HAS_STATE(client, MQTT_STATE_CONNECTED)) {
645 return 0;
646 }
647
648 /* Return generic protocol error */
649 return -EPROTO;
650 }
651
mqtt_auth(struct mqtt_client * client,const struct mqtt_auth_param * param)652 int mqtt_auth(struct mqtt_client *client, const struct mqtt_auth_param *param)
653 {
654 int err_code;
655 struct buf_ctx packet;
656
657 NULL_PARAM_CHECK(client);
658 NULL_PARAM_CHECK(param);
659
660 mqtt_mutex_lock(client);
661
662 if (!mqtt_is_version_5_0(client)) {
663 NET_ERR("Auth packet only supported in MQTT 5.0");
664 err_code = -ENOTSUP;
665 goto error;
666 }
667
668 tx_buf_init(client, &packet);
669
670 err_code = verify_auth_state(client);
671 if (err_code < 0) {
672 goto error;
673 }
674
675 err_code = auth_encode(param, &packet);
676 if (err_code < 0) {
677 goto error;
678 }
679
680 err_code = client_write(client, packet.cur, packet.end - packet.cur);
681
682 error:
683 mqtt_mutex_unlock(client);
684
685 return err_code;
686 }
687 #endif /* CONFIG_MQTT_VERSION_5_0 */
688
mqtt_abort(struct mqtt_client * client)689 int mqtt_abort(struct mqtt_client *client)
690 {
691 NULL_PARAM_CHECK(client);
692
693 mqtt_mutex_lock(client);
694
695 if (client->internal.state != MQTT_STATE_IDLE) {
696 mqtt_client_disconnect(client, -ECONNABORTED, true);
697 }
698
699 mqtt_mutex_unlock(client);
700
701 return 0;
702 }
703
mqtt_live(struct mqtt_client * client)704 int mqtt_live(struct mqtt_client *client)
705 {
706 int err_code = 0;
707 uint32_t elapsed_time;
708 bool ping_sent = false;
709
710 NULL_PARAM_CHECK(client);
711
712 mqtt_mutex_lock(client);
713
714 elapsed_time = mqtt_elapsed_time_in_ms_get(
715 client->internal.last_activity);
716 if ((client->keepalive > 0) &&
717 (elapsed_time >= (client->keepalive * 1000))) {
718 err_code = mqtt_ping(client);
719 ping_sent = true;
720 }
721
722 mqtt_mutex_unlock(client);
723
724 if (ping_sent) {
725 return err_code;
726 } else {
727 return -EAGAIN;
728 }
729 }
730
mqtt_keepalive_time_left(const struct mqtt_client * client)731 int mqtt_keepalive_time_left(const struct mqtt_client *client)
732 {
733 uint32_t elapsed_time = mqtt_elapsed_time_in_ms_get(
734 client->internal.last_activity);
735 uint32_t keepalive_ms = 1000U * client->keepalive;
736
737 if (client->keepalive == 0) {
738 /* Keep alive not enabled. */
739 return -1;
740 }
741
742 if (keepalive_ms <= elapsed_time) {
743 return 0;
744 }
745
746 return keepalive_ms - elapsed_time;
747 }
748
mqtt_input(struct mqtt_client * client)749 int mqtt_input(struct mqtt_client *client)
750 {
751 int err_code = 0;
752
753 NULL_PARAM_CHECK(client);
754
755 mqtt_mutex_lock(client);
756
757 NET_DBG("state:0x%08x", client->internal.state);
758
759 if (MQTT_HAS_STATE(client, MQTT_STATE_TCP_CONNECTED)) {
760 err_code = client_read(client);
761 } else {
762 err_code = -ENOTCONN;
763 }
764
765 mqtt_mutex_unlock(client);
766
767 return err_code;
768 }
769
read_publish_payload(struct mqtt_client * client,void * buffer,size_t length,bool shall_block)770 static int read_publish_payload(struct mqtt_client *client, void *buffer,
771 size_t length, bool shall_block)
772 {
773 int ret;
774
775 NULL_PARAM_CHECK(client);
776
777 mqtt_mutex_lock(client);
778
779 if (client->internal.remaining_payload == 0U) {
780 ret = 0;
781 goto exit;
782 }
783
784 if (client->internal.remaining_payload < length) {
785 length = client->internal.remaining_payload;
786 }
787
788 ret = mqtt_transport_read(client, buffer, length, shall_block);
789 if (!shall_block && ret == -EAGAIN) {
790 goto exit;
791 }
792
793 if (ret <= 0) {
794 if (ret == 0) {
795 ret = -ENOTCONN;
796 }
797
798 mqtt_client_disconnect(client, ret, true);
799 goto exit;
800 }
801
802 client->internal.remaining_payload -= ret;
803
804 exit:
805 mqtt_mutex_unlock(client);
806
807 return ret;
808 }
809
mqtt_read_publish_payload(struct mqtt_client * client,void * buffer,size_t length)810 int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer,
811 size_t length)
812 {
813 return read_publish_payload(client, buffer, length, false);
814 }
815
mqtt_read_publish_payload_blocking(struct mqtt_client * client,void * buffer,size_t length)816 int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer,
817 size_t length)
818 {
819 return read_publish_payload(client, buffer, length, true);
820 }
821
mqtt_readall_publish_payload(struct mqtt_client * client,uint8_t * buffer,size_t length)822 int mqtt_readall_publish_payload(struct mqtt_client *client, uint8_t *buffer,
823 size_t length)
824 {
825 uint8_t *end = buffer + length;
826
827 while (buffer < end) {
828 int ret = mqtt_read_publish_payload_blocking(client, buffer,
829 end - buffer);
830
831 if (ret < 0) {
832 return ret;
833 } else if (ret == 0) {
834 return -EIO;
835 }
836
837 buffer += ret;
838 }
839
840 return 0;
841 }
842