1 /*
2  * Copyright (c) 2018 Nordic Semiconductor ASA
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 /** @file mqtt.c
8  *
9  * @brief MQTT Client API Implementation.
10  */
11 
12 #include <zephyr/logging/log.h>
13 LOG_MODULE_REGISTER(net_mqtt, CONFIG_MQTT_LOG_LEVEL);
14 
15 #include <zephyr/net/mqtt.h>
16 
17 #include "mqtt_transport.h"
18 #include "mqtt_internal.h"
19 #include "mqtt_os.h"
20 
client_reset(struct mqtt_client * client)21 static void client_reset(struct mqtt_client *client)
22 {
23 	MQTT_STATE_INIT(client);
24 
25 	client->internal.last_activity = 0U;
26 	client->internal.rx_buf_datalen = 0U;
27 	client->internal.remaining_payload = 0U;
28 }
29 
30 /** @brief Initialize tx buffer. */
tx_buf_init(struct mqtt_client * client,struct buf_ctx * buf)31 static void tx_buf_init(struct mqtt_client *client, struct buf_ctx *buf)
32 {
33 	memset(client->tx_buf, 0, client->tx_buf_size);
34 	buf->cur = client->tx_buf;
35 	buf->end = client->tx_buf + client->tx_buf_size;
36 }
37 
event_notify(struct mqtt_client * client,const struct mqtt_evt * evt)38 void event_notify(struct mqtt_client *client, const struct mqtt_evt *evt)
39 {
40 	if (client->evt_cb != NULL) {
41 		mqtt_mutex_unlock(client);
42 
43 		client->evt_cb(client, evt);
44 
45 		mqtt_mutex_lock(client);
46 	}
47 }
48 
mqtt_client_disconnect(struct mqtt_client * client,int result,bool notify)49 void mqtt_client_disconnect(struct mqtt_client *client, int result, bool notify)
50 {
51 	int err_code;
52 
53 	err_code = mqtt_transport_disconnect(client);
54 	if (err_code < 0) {
55 		NET_ERR("Failed to disconnect transport!");
56 	}
57 
58 	/* Reset internal state. */
59 	client_reset(client);
60 
61 	if (notify) {
62 		struct mqtt_evt evt = {
63 			.type = MQTT_EVT_DISCONNECT,
64 			.result = result,
65 		};
66 
67 		/* Notify application. */
68 		event_notify(client, &evt);
69 	}
70 }
71 
client_connect(struct mqtt_client * client)72 static int client_connect(struct mqtt_client *client)
73 {
74 	int err_code;
75 	struct buf_ctx packet;
76 
77 	err_code = mqtt_transport_connect(client);
78 	if (err_code < 0) {
79 		return err_code;
80 	}
81 
82 	tx_buf_init(client, &packet);
83 	MQTT_SET_STATE(client, MQTT_STATE_TCP_CONNECTED);
84 
85 	err_code = connect_request_encode(client, &packet);
86 	if (err_code < 0) {
87 		goto error;
88 	}
89 
90 	/* Send MQTT identification message to broker. */
91 	err_code = mqtt_transport_write(client, packet.cur,
92 					packet.end - packet.cur);
93 	if (err_code < 0) {
94 		goto error;
95 	}
96 
97 	client->internal.last_activity = mqtt_sys_tick_in_ms_get();
98 
99 	/* Reset the unanswered ping count for a new connection */
100 	client->unacked_ping = 0;
101 
102 	NET_INFO("Connect completed");
103 
104 	return 0;
105 
106 error:
107 	mqtt_client_disconnect(client, err_code, false);
108 	return err_code;
109 }
110 
111 static int client_write(struct mqtt_client *client, const uint8_t *data,
112 			uint32_t datalen);
113 
114 #if defined(CONFIG_MQTT_VERSION_5_0)
disconnect_5_0_notify(struct mqtt_client * client,int err)115 static void disconnect_5_0_notify(struct mqtt_client *client, int err)
116 {
117 	struct mqtt_disconnect_param param = { };
118 	struct buf_ctx packet;
119 
120 	/* Parser might've set custom failure reason, in such case skip generic
121 	 * mapping from errno to reason code.
122 	 */
123 	if (client->internal.disconnect_reason != MQTT_DISCONNECT_NORMAL) {
124 		param.reason_code = client->internal.disconnect_reason;
125 	} else {
126 		switch (err) {
127 		case ECONNREFUSED:
128 		case ENOTCONN:
129 			/* Connection rejected/closed, skip disconnect. */
130 			return;
131 		case EINVAL:
132 			param.reason_code = MQTT_DISCONNECT_MALFORMED_PACKET;
133 			break;
134 		case EBADMSG:
135 			param.reason_code = MQTT_DISCONNECT_PROTOCOL_ERROR;
136 			break;
137 		case ENOMEM:
138 			param.reason_code = MQTT_DISCONNECT_PACKET_TOO_LARGE;
139 			break;
140 		default:
141 			param.reason_code = MQTT_DISCONNECT_UNSPECIFIED_ERROR;
142 			break;
143 		}
144 	}
145 
146 	tx_buf_init(client, &packet);
147 
148 	if (disconnect_encode(client, &param, &packet) < 0) {
149 		return;
150 	}
151 
152 	(void)client_write(client, packet.cur, packet.end - packet.cur);
153 }
154 #else
disconnect_5_0_notify(struct mqtt_client * client,int err)155 static void disconnect_5_0_notify(struct mqtt_client *client, int err)
156 {
157 	ARG_UNUSED(client);
158 	ARG_UNUSED(err);
159 }
160 #endif /* CONFIG_MQTT_VERSION_5_0 */
161 
162 
client_read(struct mqtt_client * client)163 static int client_read(struct mqtt_client *client)
164 {
165 	int err_code;
166 
167 	if (client->internal.remaining_payload > 0) {
168 		return -EBUSY;
169 	}
170 
171 	err_code = mqtt_handle_rx(client);
172 	if (err_code < 0) {
173 		if (mqtt_is_version_5_0(client)) {
174 			/* Best effort send, if it fails just shut the connection. */
175 			disconnect_5_0_notify(client, -err_code);
176 		}
177 
178 		mqtt_client_disconnect(client, err_code, true);
179 	}
180 
181 	return err_code;
182 }
183 
client_write(struct mqtt_client * client,const uint8_t * data,uint32_t datalen)184 static int client_write(struct mqtt_client *client, const uint8_t *data,
185 			uint32_t datalen)
186 {
187 	int err_code;
188 
189 	NET_DBG("[%p]: Transport writing %d bytes.", client, datalen);
190 
191 	err_code = mqtt_transport_write(client, data, datalen);
192 	if (err_code < 0) {
193 		NET_ERR("Transport write failed, err_code = %d, "
194 			 "closing connection", err_code);
195 		mqtt_client_disconnect(client, err_code, true);
196 		return err_code;
197 	}
198 
199 	NET_DBG("[%p]: Transport write complete.", client);
200 	client->internal.last_activity = mqtt_sys_tick_in_ms_get();
201 
202 	return 0;
203 }
204 
client_write_msg(struct mqtt_client * client,const struct msghdr * message)205 static int client_write_msg(struct mqtt_client *client,
206 			    const struct msghdr *message)
207 {
208 	int err_code;
209 
210 	NET_DBG("[%p]: Transport writing message.", client);
211 
212 	err_code = mqtt_transport_write_msg(client, message);
213 	if (err_code < 0) {
214 		NET_ERR("Transport write failed, err_code = %d, "
215 			 "closing connection", err_code);
216 		mqtt_client_disconnect(client, err_code, true);
217 		return err_code;
218 	}
219 
220 	NET_DBG("[%p]: Transport write complete.", client);
221 	client->internal.last_activity = mqtt_sys_tick_in_ms_get();
222 
223 	return 0;
224 }
225 
mqtt_client_init(struct mqtt_client * client)226 void mqtt_client_init(struct mqtt_client *client)
227 {
228 	NULL_PARAM_CHECK_VOID(client);
229 
230 	memset(client, 0, sizeof(*client));
231 
232 	MQTT_STATE_INIT(client);
233 	mqtt_mutex_init(client);
234 
235 	client->protocol_version = IS_ENABLED(CONFIG_MQTT_VERSION_5_0) ?
236 				   MQTT_VERSION_5_0 : MQTT_VERSION_3_1_1;
237 	client->clean_session = MQTT_CLEAN_SESSION;
238 	client->keepalive = MQTT_KEEPALIVE;
239 }
240 
241 #if defined(CONFIG_SOCKS)
mqtt_client_set_proxy(struct mqtt_client * client,struct sockaddr * proxy_addr,socklen_t addrlen)242 int mqtt_client_set_proxy(struct mqtt_client *client,
243 			  struct sockaddr *proxy_addr,
244 			  socklen_t addrlen)
245 {
246 	if (IS_ENABLED(CONFIG_SOCKS)) {
247 		if (!client || !proxy_addr) {
248 			return -EINVAL;
249 		}
250 
251 		client->transport.proxy.addrlen = addrlen;
252 		memcpy(&client->transport.proxy.addr, proxy_addr, addrlen);
253 
254 		return 0;
255 	}
256 
257 	return -ENOTSUP;
258 }
259 #endif
260 
mqtt_connect(struct mqtt_client * client)261 int mqtt_connect(struct mqtt_client *client)
262 {
263 	int err_code;
264 
265 	NULL_PARAM_CHECK(client);
266 	NULL_PARAM_CHECK(client->client_id.utf8);
267 
268 	mqtt_mutex_lock(client);
269 
270 	if ((client->tx_buf == NULL) || (client->rx_buf == NULL)) {
271 		err_code = -ENOMEM;
272 		goto error;
273 	}
274 
275 	err_code = client_connect(client);
276 
277 error:
278 	if (err_code < 0) {
279 		client_reset(client);
280 	}
281 
282 	mqtt_mutex_unlock(client);
283 
284 	return err_code;
285 }
286 
verify_tx_state(const struct mqtt_client * client)287 static int verify_tx_state(const struct mqtt_client *client)
288 {
289 	if (!MQTT_HAS_STATE(client, MQTT_STATE_CONNECTED)) {
290 		return -ENOTCONN;
291 	}
292 
293 	return 0;
294 }
295 
mqtt_publish(struct mqtt_client * client,const struct mqtt_publish_param * param)296 int mqtt_publish(struct mqtt_client *client,
297 		 const struct mqtt_publish_param *param)
298 {
299 	int err_code;
300 	struct buf_ctx packet;
301 	struct iovec io_vector[2];
302 	struct msghdr msg;
303 
304 	NULL_PARAM_CHECK(client);
305 	NULL_PARAM_CHECK(param);
306 
307 	NET_DBG("[CID %p]:[State 0x%02x]: >> Topic size 0x%08x, "
308 		 "Data size 0x%08x", client, client->internal.state,
309 		 param->message.topic.topic.size,
310 		 param->message.payload.len);
311 
312 	mqtt_mutex_lock(client);
313 
314 	tx_buf_init(client, &packet);
315 
316 	err_code = verify_tx_state(client);
317 	if (err_code < 0) {
318 		goto error;
319 	}
320 
321 	err_code = publish_encode(client, param, &packet);
322 	if (err_code < 0) {
323 		goto error;
324 	}
325 
326 	io_vector[0].iov_base = packet.cur;
327 	io_vector[0].iov_len = packet.end - packet.cur;
328 	io_vector[1].iov_base = param->message.payload.data;
329 	io_vector[1].iov_len = param->message.payload.len;
330 
331 	memset(&msg, 0, sizeof(msg));
332 
333 	msg.msg_iov = io_vector;
334 	msg.msg_iovlen = ARRAY_SIZE(io_vector);
335 
336 	err_code = client_write_msg(client, &msg);
337 
338 error:
339 	NET_DBG("[CID %p]:[State 0x%02x]: << result 0x%08x",
340 			 client, client->internal.state, err_code);
341 
342 	mqtt_mutex_unlock(client);
343 
344 	return err_code;
345 }
346 
mqtt_publish_qos1_ack(struct mqtt_client * client,const struct mqtt_puback_param * param)347 int mqtt_publish_qos1_ack(struct mqtt_client *client,
348 			  const struct mqtt_puback_param *param)
349 {
350 	int err_code;
351 	struct buf_ctx packet;
352 
353 	NULL_PARAM_CHECK(client);
354 	NULL_PARAM_CHECK(param);
355 
356 	NET_DBG("[CID %p]:[State 0x%02x]: >> Message id 0x%04x",
357 		 client, client->internal.state, param->message_id);
358 
359 	mqtt_mutex_lock(client);
360 
361 	tx_buf_init(client, &packet);
362 
363 	err_code = verify_tx_state(client);
364 	if (err_code < 0) {
365 		goto error;
366 	}
367 
368 	err_code = publish_ack_encode(client, param, &packet);
369 	if (err_code < 0) {
370 		goto error;
371 	}
372 
373 	err_code = client_write(client, packet.cur, packet.end - packet.cur);
374 
375 error:
376 	NET_DBG("[CID %p]:[State 0x%02x]: << result 0x%08x",
377 		 client, client->internal.state, err_code);
378 
379 	mqtt_mutex_unlock(client);
380 
381 	return err_code;
382 }
383 
mqtt_publish_qos2_receive(struct mqtt_client * client,const struct mqtt_pubrec_param * param)384 int mqtt_publish_qos2_receive(struct mqtt_client *client,
385 			      const struct mqtt_pubrec_param *param)
386 {
387 	int err_code;
388 	struct buf_ctx packet;
389 
390 	NULL_PARAM_CHECK(client);
391 	NULL_PARAM_CHECK(param);
392 
393 	NET_DBG("[CID %p]:[State 0x%02x]: >> Message id 0x%04x",
394 		 client, client->internal.state, param->message_id);
395 
396 	mqtt_mutex_lock(client);
397 
398 	tx_buf_init(client, &packet);
399 
400 	err_code = verify_tx_state(client);
401 	if (err_code < 0) {
402 		goto error;
403 	}
404 
405 	err_code = publish_receive_encode(client, param, &packet);
406 	if (err_code < 0) {
407 		goto error;
408 	}
409 
410 	err_code = client_write(client, packet.cur, packet.end - packet.cur);
411 
412 error:
413 	NET_DBG("[CID %p]:[State 0x%02x]: << result 0x%08x",
414 		 client, client->internal.state, err_code);
415 
416 	mqtt_mutex_unlock(client);
417 
418 	return err_code;
419 }
420 
mqtt_publish_qos2_release(struct mqtt_client * client,const struct mqtt_pubrel_param * param)421 int mqtt_publish_qos2_release(struct mqtt_client *client,
422 			      const struct mqtt_pubrel_param *param)
423 {
424 	int err_code;
425 	struct buf_ctx packet;
426 
427 	NULL_PARAM_CHECK(client);
428 	NULL_PARAM_CHECK(param);
429 
430 	NET_DBG("[CID %p]:[State 0x%02x]: >> Message id 0x%04x",
431 		 client, client->internal.state, param->message_id);
432 
433 	mqtt_mutex_lock(client);
434 
435 	tx_buf_init(client, &packet);
436 
437 	err_code = verify_tx_state(client);
438 	if (err_code < 0) {
439 		goto error;
440 	}
441 
442 	err_code = publish_release_encode(client, param, &packet);
443 	if (err_code < 0) {
444 		goto error;
445 	}
446 
447 	err_code = client_write(client, packet.cur, packet.end - packet.cur);
448 
449 error:
450 	NET_DBG("[CID %p]:[State 0x%02x]: << result 0x%08x",
451 		 client, client->internal.state, err_code);
452 
453 	mqtt_mutex_unlock(client);
454 
455 	return err_code;
456 }
457 
mqtt_publish_qos2_complete(struct mqtt_client * client,const struct mqtt_pubcomp_param * param)458 int mqtt_publish_qos2_complete(struct mqtt_client *client,
459 			       const struct mqtt_pubcomp_param *param)
460 {
461 	int err_code;
462 	struct buf_ctx packet;
463 
464 	NULL_PARAM_CHECK(client);
465 	NULL_PARAM_CHECK(param);
466 
467 	NET_DBG("[CID %p]:[State 0x%02x]: >> Message id 0x%04x",
468 		 client, client->internal.state, param->message_id);
469 
470 	mqtt_mutex_lock(client);
471 
472 	tx_buf_init(client, &packet);
473 
474 	err_code = verify_tx_state(client);
475 	if (err_code < 0) {
476 		goto error;
477 	}
478 
479 	err_code = publish_complete_encode(client, param, &packet);
480 	if (err_code < 0) {
481 		goto error;
482 	}
483 
484 	err_code = client_write(client, packet.cur, packet.end - packet.cur);
485 	if (err_code < 0) {
486 		goto error;
487 	}
488 
489 error:
490 	NET_DBG("[CID %p]:[State 0x%02x]: << result 0x%08x",
491 		 client, client->internal.state, err_code);
492 
493 	mqtt_mutex_unlock(client);
494 
495 	return err_code;
496 }
497 
mqtt_disconnect(struct mqtt_client * client,const struct mqtt_disconnect_param * param)498 int mqtt_disconnect(struct mqtt_client *client,
499 		    const struct mqtt_disconnect_param *param)
500 {
501 	int err_code;
502 	struct buf_ctx packet;
503 
504 	NULL_PARAM_CHECK(client);
505 
506 	mqtt_mutex_lock(client);
507 
508 	tx_buf_init(client, &packet);
509 
510 	err_code = verify_tx_state(client);
511 	if (err_code < 0) {
512 		goto error;
513 	}
514 
515 	err_code = disconnect_encode(client, param, &packet);
516 	if (err_code < 0) {
517 		goto error;
518 	}
519 
520 	err_code = client_write(client, packet.cur, packet.end - packet.cur);
521 	if (err_code < 0) {
522 		goto error;
523 	}
524 
525 	mqtt_client_disconnect(client, 0, true);
526 
527 error:
528 	mqtt_mutex_unlock(client);
529 
530 	return err_code;
531 }
532 
mqtt_subscribe(struct mqtt_client * client,const struct mqtt_subscription_list * param)533 int mqtt_subscribe(struct mqtt_client *client,
534 		   const struct mqtt_subscription_list *param)
535 {
536 	int err_code;
537 	struct buf_ctx packet;
538 
539 	NULL_PARAM_CHECK(client);
540 	NULL_PARAM_CHECK(param);
541 
542 	NET_DBG("[CID %p]:[State 0x%02x]: >> message id 0x%04x "
543 		 "topic count 0x%04x", client, client->internal.state,
544 		 param->message_id, param->list_count);
545 
546 	mqtt_mutex_lock(client);
547 
548 	tx_buf_init(client, &packet);
549 
550 	err_code = verify_tx_state(client);
551 	if (err_code < 0) {
552 		goto error;
553 	}
554 
555 	err_code = subscribe_encode(client, param, &packet);
556 	if (err_code < 0) {
557 		goto error;
558 	}
559 
560 	err_code = client_write(client, packet.cur, packet.end - packet.cur);
561 
562 error:
563 	NET_DBG("[CID %p]:[State 0x%02x]: << result 0x%08x",
564 		 client, client->internal.state, err_code);
565 
566 	mqtt_mutex_unlock(client);
567 
568 	return err_code;
569 }
570 
mqtt_unsubscribe(struct mqtt_client * client,const struct mqtt_subscription_list * param)571 int mqtt_unsubscribe(struct mqtt_client *client,
572 		     const struct mqtt_subscription_list *param)
573 {
574 	int err_code;
575 	struct buf_ctx packet;
576 
577 	NULL_PARAM_CHECK(client);
578 	NULL_PARAM_CHECK(param);
579 
580 	mqtt_mutex_lock(client);
581 
582 	tx_buf_init(client, &packet);
583 
584 	err_code = verify_tx_state(client);
585 	if (err_code < 0) {
586 		goto error;
587 	}
588 
589 	err_code = unsubscribe_encode(client, param, &packet);
590 	if (err_code < 0) {
591 		goto error;
592 	}
593 
594 	err_code = client_write(client, packet.cur, packet.end - packet.cur);
595 
596 error:
597 	mqtt_mutex_unlock(client);
598 
599 	return err_code;
600 }
601 
mqtt_ping(struct mqtt_client * client)602 int mqtt_ping(struct mqtt_client *client)
603 {
604 	int err_code;
605 	struct buf_ctx packet;
606 
607 	NULL_PARAM_CHECK(client);
608 
609 	mqtt_mutex_lock(client);
610 
611 	tx_buf_init(client, &packet);
612 
613 	err_code = verify_tx_state(client);
614 	if (err_code < 0) {
615 		goto error;
616 	}
617 
618 	err_code = ping_request_encode(&packet);
619 	if (err_code < 0) {
620 		goto error;
621 	}
622 
623 	err_code = client_write(client, packet.cur, packet.end - packet.cur);
624 
625 	if (client->unacked_ping >= INT8_MAX) {
626 		NET_WARN("PING count overflow!");
627 	} else {
628 		client->unacked_ping++;
629 	}
630 
631 error:
632 	mqtt_mutex_unlock(client);
633 
634 	return err_code;
635 }
636 
637 #if defined(CONFIG_MQTT_VERSION_5_0)
verify_auth_state(const struct mqtt_client * client)638 static int verify_auth_state(const struct mqtt_client *client)
639 {
640 	/* Enhanced authentication is only allowed when connecting at MQTT
641 	 * level (before CONNACK is received).
642 	 */
643 	if (MQTT_HAS_STATE(client, MQTT_STATE_TCP_CONNECTED) &&
644 	    !MQTT_HAS_STATE(client, MQTT_STATE_CONNECTED)) {
645 		return 0;
646 	}
647 
648 	/* Return generic protocol error */
649 	return -EPROTO;
650 }
651 
mqtt_auth(struct mqtt_client * client,const struct mqtt_auth_param * param)652 int mqtt_auth(struct mqtt_client *client, const struct mqtt_auth_param *param)
653 {
654 	int err_code;
655 	struct buf_ctx packet;
656 
657 	NULL_PARAM_CHECK(client);
658 	NULL_PARAM_CHECK(param);
659 
660 	mqtt_mutex_lock(client);
661 
662 	if (!mqtt_is_version_5_0(client)) {
663 		NET_ERR("Auth packet only supported in MQTT 5.0");
664 		err_code = -ENOTSUP;
665 		goto error;
666 	}
667 
668 	tx_buf_init(client, &packet);
669 
670 	err_code = verify_auth_state(client);
671 	if (err_code < 0) {
672 		goto error;
673 	}
674 
675 	err_code = auth_encode(param, &packet);
676 	if (err_code < 0) {
677 		goto error;
678 	}
679 
680 	err_code = client_write(client, packet.cur, packet.end - packet.cur);
681 
682 error:
683 	mqtt_mutex_unlock(client);
684 
685 	return err_code;
686 }
687 #endif /* CONFIG_MQTT_VERSION_5_0 */
688 
mqtt_abort(struct mqtt_client * client)689 int mqtt_abort(struct mqtt_client *client)
690 {
691 	NULL_PARAM_CHECK(client);
692 
693 	mqtt_mutex_lock(client);
694 
695 	if (client->internal.state != MQTT_STATE_IDLE) {
696 		mqtt_client_disconnect(client, -ECONNABORTED, true);
697 	}
698 
699 	mqtt_mutex_unlock(client);
700 
701 	return 0;
702 }
703 
mqtt_live(struct mqtt_client * client)704 int mqtt_live(struct mqtt_client *client)
705 {
706 	int err_code = 0;
707 	uint32_t elapsed_time;
708 	bool ping_sent = false;
709 
710 	NULL_PARAM_CHECK(client);
711 
712 	mqtt_mutex_lock(client);
713 
714 	elapsed_time = mqtt_elapsed_time_in_ms_get(
715 				client->internal.last_activity);
716 	if ((client->keepalive > 0) &&
717 	    (elapsed_time >= (client->keepalive * 1000))) {
718 		err_code = mqtt_ping(client);
719 		ping_sent = true;
720 	}
721 
722 	mqtt_mutex_unlock(client);
723 
724 	if (ping_sent) {
725 		return err_code;
726 	} else {
727 		return -EAGAIN;
728 	}
729 }
730 
mqtt_keepalive_time_left(const struct mqtt_client * client)731 int mqtt_keepalive_time_left(const struct mqtt_client *client)
732 {
733 	uint32_t elapsed_time = mqtt_elapsed_time_in_ms_get(
734 					client->internal.last_activity);
735 	uint32_t keepalive_ms = 1000U * client->keepalive;
736 
737 	if (client->keepalive == 0) {
738 		/* Keep alive not enabled. */
739 		return -1;
740 	}
741 
742 	if (keepalive_ms <= elapsed_time) {
743 		return 0;
744 	}
745 
746 	return keepalive_ms - elapsed_time;
747 }
748 
mqtt_input(struct mqtt_client * client)749 int mqtt_input(struct mqtt_client *client)
750 {
751 	int err_code = 0;
752 
753 	NULL_PARAM_CHECK(client);
754 
755 	mqtt_mutex_lock(client);
756 
757 	NET_DBG("state:0x%08x", client->internal.state);
758 
759 	if (MQTT_HAS_STATE(client, MQTT_STATE_TCP_CONNECTED)) {
760 		err_code = client_read(client);
761 	} else {
762 		err_code = -ENOTCONN;
763 	}
764 
765 	mqtt_mutex_unlock(client);
766 
767 	return err_code;
768 }
769 
read_publish_payload(struct mqtt_client * client,void * buffer,size_t length,bool shall_block)770 static int read_publish_payload(struct mqtt_client *client, void *buffer,
771 				size_t length, bool shall_block)
772 {
773 	int ret;
774 
775 	NULL_PARAM_CHECK(client);
776 
777 	mqtt_mutex_lock(client);
778 
779 	if (client->internal.remaining_payload == 0U) {
780 		ret = 0;
781 		goto exit;
782 	}
783 
784 	if (client->internal.remaining_payload < length) {
785 		length = client->internal.remaining_payload;
786 	}
787 
788 	ret = mqtt_transport_read(client, buffer, length, shall_block);
789 	if (!shall_block && ret == -EAGAIN) {
790 		goto exit;
791 	}
792 
793 	if (ret <= 0) {
794 		if (ret == 0) {
795 			ret = -ENOTCONN;
796 		}
797 
798 		mqtt_client_disconnect(client, ret, true);
799 		goto exit;
800 	}
801 
802 	client->internal.remaining_payload -= ret;
803 
804 exit:
805 	mqtt_mutex_unlock(client);
806 
807 	return ret;
808 }
809 
mqtt_read_publish_payload(struct mqtt_client * client,void * buffer,size_t length)810 int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer,
811 			      size_t length)
812 {
813 	return read_publish_payload(client, buffer, length, false);
814 }
815 
mqtt_read_publish_payload_blocking(struct mqtt_client * client,void * buffer,size_t length)816 int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer,
817 				       size_t length)
818 {
819 	return read_publish_payload(client, buffer, length, true);
820 }
821 
mqtt_readall_publish_payload(struct mqtt_client * client,uint8_t * buffer,size_t length)822 int mqtt_readall_publish_payload(struct mqtt_client *client, uint8_t *buffer,
823 				 size_t length)
824 {
825 	uint8_t *end = buffer + length;
826 
827 	while (buffer < end) {
828 		int ret = mqtt_read_publish_payload_blocking(client, buffer,
829 							     end - buffer);
830 
831 		if (ret < 0) {
832 			return ret;
833 		} else if (ret == 0) {
834 			return -EIO;
835 		}
836 
837 		buffer += ret;
838 	}
839 
840 	return 0;
841 }
842