1 /*
2  * Copyright (c) 2018 Nordic Semiconductor ASA
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <zephyr/logging/log.h>
8 LOG_MODULE_REGISTER(net_mqtt_rx, CONFIG_MQTT_LOG_LEVEL);
9 
10 #include "mqtt_internal.h"
11 #include "mqtt_transport.h"
12 #include "mqtt_os.h"
13 
14 /** @file mqtt_rx.c
15  *
16  * @brief MQTT Received data handling.
17  */
18 
mqtt_handle_packet(struct mqtt_client * client,uint8_t type_and_flags,uint32_t var_length,struct buf_ctx * buf)19 static int mqtt_handle_packet(struct mqtt_client *client,
20 			      uint8_t type_and_flags,
21 			      uint32_t var_length,
22 			      struct buf_ctx *buf)
23 {
24 	int err_code = 0;
25 	bool notify_event = true;
26 	struct mqtt_evt evt = { 0 };
27 
28 	/* Success by default, overwritten in special cases. */
29 	evt.result = 0;
30 
31 	switch (type_and_flags & 0xF0) {
32 	case MQTT_PKT_TYPE_CONNACK:
33 		NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_CONNACK!", client);
34 
35 		evt.type = MQTT_EVT_CONNACK;
36 		err_code = connect_ack_decode(client, buf, &evt.param.connack);
37 		if (err_code == 0) {
38 			NET_DBG("[CID %p]: return_code: %d", client,
39 				 evt.param.connack.return_code);
40 
41 			/* For MQTT 5.0 this is still valid as MQTT_CONNACK_SUCCESS
42 			 * is encoded as 0 as well.
43 			 */
44 			if (evt.param.connack.return_code ==
45 						MQTT_CONNECTION_ACCEPTED) {
46 				/* Set state. */
47 				MQTT_SET_STATE(client, MQTT_STATE_CONNECTED);
48 			} else {
49 				err_code = -ECONNREFUSED;
50 			}
51 
52 			evt.result = evt.param.connack.return_code;
53 		} else {
54 			evt.result = err_code;
55 		}
56 
57 		break;
58 
59 	case MQTT_PKT_TYPE_PUBLISH:
60 		NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBLISH", client);
61 
62 		evt.type = MQTT_EVT_PUBLISH;
63 		err_code = publish_decode(client, type_and_flags, var_length,
64 					  buf, &evt.param.publish);
65 		evt.result = err_code;
66 
67 		client->internal.remaining_payload =
68 					evt.param.publish.message.payload.len;
69 
70 		NET_DBG("PUB QoS:%02x, message len %08x, topic len %08x",
71 			 evt.param.publish.message.topic.qos,
72 			 evt.param.publish.message.payload.len,
73 			 evt.param.publish.message.topic.topic.size);
74 
75 		break;
76 
77 	case MQTT_PKT_TYPE_PUBACK:
78 		NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBACK!", client);
79 
80 		evt.type = MQTT_EVT_PUBACK;
81 		err_code = publish_ack_decode(client, buf, &evt.param.puback);
82 		evt.result = err_code;
83 		break;
84 
85 	case MQTT_PKT_TYPE_PUBREC:
86 		NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBREC!", client);
87 
88 		evt.type = MQTT_EVT_PUBREC;
89 		err_code = publish_receive_decode(client, buf,
90 						  &evt.param.pubrec);
91 		evt.result = err_code;
92 		break;
93 
94 	case MQTT_PKT_TYPE_PUBREL:
95 		NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBREL!", client);
96 
97 		evt.type = MQTT_EVT_PUBREL;
98 		err_code = publish_release_decode(client, buf,
99 						  &evt.param.pubrel);
100 		evt.result = err_code;
101 		break;
102 
103 	case MQTT_PKT_TYPE_PUBCOMP:
104 		NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBCOMP!", client);
105 
106 		evt.type = MQTT_EVT_PUBCOMP;
107 		err_code = publish_complete_decode(client, buf,
108 						   &evt.param.pubcomp);
109 		evt.result = err_code;
110 		break;
111 
112 	case MQTT_PKT_TYPE_SUBACK:
113 		NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_SUBACK!", client);
114 
115 		evt.type = MQTT_EVT_SUBACK;
116 		err_code = subscribe_ack_decode(client, buf, &evt.param.suback);
117 		evt.result = err_code;
118 		break;
119 
120 	case MQTT_PKT_TYPE_UNSUBACK:
121 		NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_UNSUBACK!", client);
122 
123 		evt.type = MQTT_EVT_UNSUBACK;
124 		err_code = unsubscribe_ack_decode(client, buf,
125 						  &evt.param.unsuback);
126 		evt.result = err_code;
127 		break;
128 
129 	case MQTT_PKT_TYPE_PINGRSP:
130 		NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PINGRSP!", client);
131 
132 		if (client->unacked_ping <= 0) {
133 			NET_WARN("Unexpected PINGRSP");
134 			client->unacked_ping = 0;
135 		} else {
136 			client->unacked_ping--;
137 		}
138 
139 		evt.type = MQTT_EVT_PINGRESP;
140 		break;
141 
142 #if defined(CONFIG_MQTT_VERSION_5_0)
143 	case MQTT_PKT_TYPE_DISCONNECT:
144 		evt.type = MQTT_EVT_DISCONNECT;
145 		err_code = disconnect_decode(client, buf, &evt.param.disconnect);
146 		if (err_code == 0) {
147 			evt.result = evt.param.disconnect.reason_code;
148 			/* Don't notify yet, the code below will handle this. */
149 			mqtt_client_disconnect(client, evt.result, false);
150 		} else {
151 			/* Again, don't notify yet, error handling code will
152 			 * disconnect and report error.
153 			 */
154 			notify_event = false;
155 		}
156 
157 		break;
158 
159 	case MQTT_PKT_TYPE_AUTH:
160 		evt.type = MQTT_EVT_AUTH;
161 		err_code = auth_decode(client, buf, &evt.param.auth);
162 		if (err_code == 0) {
163 			evt.result = evt.param.auth.reason_code;
164 		} else {
165 			notify_event = false;
166 		}
167 
168 		break;
169 #endif
170 
171 	default:
172 		/* Nothing to notify. */
173 		notify_event = false;
174 		break;
175 	}
176 
177 	if (notify_event == true) {
178 		event_notify(client, &evt);
179 	}
180 
181 	return err_code;
182 }
183 
mqtt_read_message_chunk(struct mqtt_client * client,struct buf_ctx * buf,uint32_t length)184 static int mqtt_read_message_chunk(struct mqtt_client *client,
185 				   struct buf_ctx *buf, uint32_t length)
186 {
187 	uint32_t remaining;
188 	int len;
189 
190 	/* In case all data requested has already been buffered, return. */
191 	if (length <= (buf->end - buf->cur)) {
192 		return 0;
193 	}
194 
195 	/* Calculate how much data we need to read from the transport,
196 	 * given the already buffered data.
197 	 */
198 	remaining = length - (buf->end - buf->cur);
199 
200 	/* Check if read does not exceed the buffer. */
201 	if ((buf->end + remaining > client->rx_buf + client->rx_buf_size) ||
202 	    (buf->end + remaining < client->rx_buf)) {
203 		NET_ERR("[CID %p]: Read would exceed RX buffer bounds.",
204 			 client);
205 		return -ENOMEM;
206 	}
207 
208 	len = mqtt_transport_read(client, buf->end, remaining, false);
209 	if (len < 0) {
210 		if (len != -EAGAIN) {
211 			NET_ERR("[CID %p]: Transport read error: %d", client, len);
212 		}
213 		return len;
214 	}
215 
216 	if (len == 0) {
217 		NET_ERR("[CID %p]: Connection closed.", client);
218 		return -ENOTCONN;
219 	}
220 
221 	client->internal.rx_buf_datalen += len;
222 	buf->end += len;
223 
224 	if (len < remaining) {
225 		NET_ERR("[CID %p]: Message partially received.", client);
226 		return -EAGAIN;
227 	}
228 
229 	return 0;
230 }
231 
mqtt_read_publish_var_header(struct mqtt_client * client,uint8_t type_and_flags,struct buf_ctx * buf)232 static int mqtt_read_publish_var_header(struct mqtt_client *client,
233 					uint8_t type_and_flags,
234 					struct buf_ctx *buf)
235 {
236 	uint8_t qos = (type_and_flags & MQTT_HEADER_QOS_MASK) >> 1;
237 	int err_code;
238 	uint32_t variable_header_length;
239 
240 	/* Read topic length field. */
241 	err_code = mqtt_read_message_chunk(client, buf, sizeof(uint16_t));
242 	if (err_code < 0) {
243 		return err_code;
244 	}
245 
246 	variable_header_length = *buf->cur << 8; /* MSB */
247 	variable_header_length |= *(buf->cur + 1); /* LSB */
248 
249 	/* Add two bytes for topic length field. */
250 	variable_header_length += sizeof(uint16_t);
251 
252 	/* Add two bytes for message_id, if needed. */
253 	if (qos > MQTT_QOS_0_AT_MOST_ONCE) {
254 		variable_header_length += sizeof(uint16_t);
255 	}
256 
257 	if (mqtt_is_version_5_0(client)) {
258 		struct buf_ctx backup;
259 		uint8_t var_len = 1;
260 		uint32_t prop_len = 0;
261 
262 		while (true) {
263 			err_code = mqtt_read_message_chunk(
264 				client, buf, variable_header_length + var_len);
265 			if (err_code < 0) {
266 				return err_code;
267 			}
268 
269 			backup = *buf;
270 			buf->cur += variable_header_length;
271 
272 			/* Try to decode variable integer, in case integer is
273 			 * not complete, read more bytes from the stream and retry.
274 			 */
275 			err_code = unpack_variable_int(buf, &prop_len);
276 			if (err_code >= 0) {
277 				break;
278 			}
279 
280 			if (err_code != -EAGAIN) {
281 				return err_code;
282 			}
283 
284 			/* Try again. */
285 			var_len++;
286 			*buf = backup;
287 		}
288 
289 		*buf = backup;
290 		variable_header_length += var_len + prop_len;
291 	}
292 
293 	err_code = mqtt_read_message_chunk(client, buf,
294 					   variable_header_length);
295 	if (err_code < 0) {
296 		return err_code;
297 	}
298 
299 	return 0;
300 }
301 
mqtt_read_and_parse_fixed_header(struct mqtt_client * client,uint8_t * type_and_flags,uint32_t * var_length,struct buf_ctx * buf)302 static int mqtt_read_and_parse_fixed_header(struct mqtt_client *client,
303 					    uint8_t *type_and_flags,
304 					    uint32_t *var_length,
305 					    struct buf_ctx *buf)
306 {
307 	/* Read the mandatory part of the fixed header in first iteration. */
308 	uint8_t chunk_size = MQTT_FIXED_HEADER_MIN_SIZE;
309 	int err_code;
310 
311 	do {
312 		err_code = mqtt_read_message_chunk(client, buf, chunk_size);
313 		if (err_code < 0) {
314 			return err_code;
315 		}
316 
317 		/* Reset to pointer to the beginning of the frame. */
318 		buf->cur = client->rx_buf;
319 		chunk_size = 1U;
320 
321 		err_code = fixed_header_decode(buf, type_and_flags, var_length);
322 	} while (err_code == -EAGAIN);
323 
324 	return err_code;
325 }
326 
mqtt_handle_rx(struct mqtt_client * client)327 int mqtt_handle_rx(struct mqtt_client *client)
328 {
329 	int err_code;
330 	uint8_t type_and_flags;
331 	uint32_t var_length;
332 	struct buf_ctx buf;
333 
334 	buf.cur = client->rx_buf;
335 	buf.end = client->rx_buf + client->internal.rx_buf_datalen;
336 
337 	err_code = mqtt_read_and_parse_fixed_header(client, &type_and_flags,
338 						    &var_length, &buf);
339 	if (err_code < 0) {
340 		return (err_code == -EAGAIN) ? 0 : err_code;
341 	}
342 
343 	if ((type_and_flags & 0xF0) == MQTT_PKT_TYPE_PUBLISH) {
344 		err_code = mqtt_read_publish_var_header(client, type_and_flags,
345 							&buf);
346 	} else {
347 		err_code = mqtt_read_message_chunk(client, &buf, var_length);
348 	}
349 
350 	if (err_code < 0) {
351 		return (err_code == -EAGAIN) ? 0 : err_code;
352 	}
353 
354 	/* At this point, packet is ready to be passed to the application. */
355 	err_code = mqtt_handle_packet(client, type_and_flags, var_length, &buf);
356 	if (err_code < 0) {
357 		return err_code;
358 	}
359 
360 	client->internal.rx_buf_datalen = 0U;
361 
362 	return 0;
363 }
364