1 /*
2 * Copyright (c) 2018 Nordic Semiconductor ASA
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 #include <zephyr/logging/log.h>
8 LOG_MODULE_REGISTER(net_mqtt_rx, CONFIG_MQTT_LOG_LEVEL);
9
10 #include "mqtt_internal.h"
11 #include "mqtt_transport.h"
12 #include "mqtt_os.h"
13
14 /** @file mqtt_rx.c
15 *
16 * @brief MQTT Received data handling.
17 */
18
mqtt_handle_packet(struct mqtt_client * client,uint8_t type_and_flags,uint32_t var_length,struct buf_ctx * buf)19 static int mqtt_handle_packet(struct mqtt_client *client,
20 uint8_t type_and_flags,
21 uint32_t var_length,
22 struct buf_ctx *buf)
23 {
24 int err_code = 0;
25 bool notify_event = true;
26 struct mqtt_evt evt = { 0 };
27
28 /* Success by default, overwritten in special cases. */
29 evt.result = 0;
30
31 switch (type_and_flags & 0xF0) {
32 case MQTT_PKT_TYPE_CONNACK:
33 NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_CONNACK!", client);
34
35 evt.type = MQTT_EVT_CONNACK;
36 err_code = connect_ack_decode(client, buf, &evt.param.connack);
37 if (err_code == 0) {
38 NET_DBG("[CID %p]: return_code: %d", client,
39 evt.param.connack.return_code);
40
41 /* For MQTT 5.0 this is still valid as MQTT_CONNACK_SUCCESS
42 * is encoded as 0 as well.
43 */
44 if (evt.param.connack.return_code ==
45 MQTT_CONNECTION_ACCEPTED) {
46 /* Set state. */
47 MQTT_SET_STATE(client, MQTT_STATE_CONNECTED);
48 } else {
49 err_code = -ECONNREFUSED;
50 }
51
52 evt.result = evt.param.connack.return_code;
53 } else {
54 evt.result = err_code;
55 }
56
57 break;
58
59 case MQTT_PKT_TYPE_PUBLISH:
60 NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBLISH", client);
61
62 evt.type = MQTT_EVT_PUBLISH;
63 err_code = publish_decode(client, type_and_flags, var_length,
64 buf, &evt.param.publish);
65 evt.result = err_code;
66
67 client->internal.remaining_payload =
68 evt.param.publish.message.payload.len;
69
70 NET_DBG("PUB QoS:%02x, message len %08x, topic len %08x",
71 evt.param.publish.message.topic.qos,
72 evt.param.publish.message.payload.len,
73 evt.param.publish.message.topic.topic.size);
74
75 break;
76
77 case MQTT_PKT_TYPE_PUBACK:
78 NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBACK!", client);
79
80 evt.type = MQTT_EVT_PUBACK;
81 err_code = publish_ack_decode(client, buf, &evt.param.puback);
82 evt.result = err_code;
83 break;
84
85 case MQTT_PKT_TYPE_PUBREC:
86 NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBREC!", client);
87
88 evt.type = MQTT_EVT_PUBREC;
89 err_code = publish_receive_decode(client, buf,
90 &evt.param.pubrec);
91 evt.result = err_code;
92 break;
93
94 case MQTT_PKT_TYPE_PUBREL:
95 NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBREL!", client);
96
97 evt.type = MQTT_EVT_PUBREL;
98 err_code = publish_release_decode(client, buf,
99 &evt.param.pubrel);
100 evt.result = err_code;
101 break;
102
103 case MQTT_PKT_TYPE_PUBCOMP:
104 NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBCOMP!", client);
105
106 evt.type = MQTT_EVT_PUBCOMP;
107 err_code = publish_complete_decode(client, buf,
108 &evt.param.pubcomp);
109 evt.result = err_code;
110 break;
111
112 case MQTT_PKT_TYPE_SUBACK:
113 NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_SUBACK!", client);
114
115 evt.type = MQTT_EVT_SUBACK;
116 err_code = subscribe_ack_decode(client, buf, &evt.param.suback);
117 evt.result = err_code;
118 break;
119
120 case MQTT_PKT_TYPE_UNSUBACK:
121 NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_UNSUBACK!", client);
122
123 evt.type = MQTT_EVT_UNSUBACK;
124 err_code = unsubscribe_ack_decode(client, buf,
125 &evt.param.unsuback);
126 evt.result = err_code;
127 break;
128
129 case MQTT_PKT_TYPE_PINGRSP:
130 NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PINGRSP!", client);
131
132 if (client->unacked_ping <= 0) {
133 NET_WARN("Unexpected PINGRSP");
134 client->unacked_ping = 0;
135 } else {
136 client->unacked_ping--;
137 }
138
139 evt.type = MQTT_EVT_PINGRESP;
140 break;
141
142 #if defined(CONFIG_MQTT_VERSION_5_0)
143 case MQTT_PKT_TYPE_DISCONNECT:
144 evt.type = MQTT_EVT_DISCONNECT;
145 err_code = disconnect_decode(client, buf, &evt.param.disconnect);
146 if (err_code == 0) {
147 evt.result = evt.param.disconnect.reason_code;
148 /* Don't notify yet, the code below will handle this. */
149 mqtt_client_disconnect(client, evt.result, false);
150 } else {
151 /* Again, don't notify yet, error handling code will
152 * disconnect and report error.
153 */
154 notify_event = false;
155 }
156
157 break;
158
159 case MQTT_PKT_TYPE_AUTH:
160 evt.type = MQTT_EVT_AUTH;
161 err_code = auth_decode(client, buf, &evt.param.auth);
162 if (err_code == 0) {
163 evt.result = evt.param.auth.reason_code;
164 } else {
165 notify_event = false;
166 }
167
168 break;
169 #endif
170
171 default:
172 /* Nothing to notify. */
173 notify_event = false;
174 break;
175 }
176
177 if (notify_event == true) {
178 event_notify(client, &evt);
179 }
180
181 return err_code;
182 }
183
mqtt_read_message_chunk(struct mqtt_client * client,struct buf_ctx * buf,uint32_t length)184 static int mqtt_read_message_chunk(struct mqtt_client *client,
185 struct buf_ctx *buf, uint32_t length)
186 {
187 uint32_t remaining;
188 int len;
189
190 /* In case all data requested has already been buffered, return. */
191 if (length <= (buf->end - buf->cur)) {
192 return 0;
193 }
194
195 /* Calculate how much data we need to read from the transport,
196 * given the already buffered data.
197 */
198 remaining = length - (buf->end - buf->cur);
199
200 /* Check if read does not exceed the buffer. */
201 if ((buf->end + remaining > client->rx_buf + client->rx_buf_size) ||
202 (buf->end + remaining < client->rx_buf)) {
203 NET_ERR("[CID %p]: Read would exceed RX buffer bounds.",
204 client);
205 return -ENOMEM;
206 }
207
208 len = mqtt_transport_read(client, buf->end, remaining, false);
209 if (len < 0) {
210 if (len != -EAGAIN) {
211 NET_ERR("[CID %p]: Transport read error: %d", client, len);
212 }
213 return len;
214 }
215
216 if (len == 0) {
217 NET_ERR("[CID %p]: Connection closed.", client);
218 return -ENOTCONN;
219 }
220
221 client->internal.rx_buf_datalen += len;
222 buf->end += len;
223
224 if (len < remaining) {
225 NET_ERR("[CID %p]: Message partially received.", client);
226 return -EAGAIN;
227 }
228
229 return 0;
230 }
231
mqtt_read_publish_var_header(struct mqtt_client * client,uint8_t type_and_flags,struct buf_ctx * buf)232 static int mqtt_read_publish_var_header(struct mqtt_client *client,
233 uint8_t type_and_flags,
234 struct buf_ctx *buf)
235 {
236 uint8_t qos = (type_and_flags & MQTT_HEADER_QOS_MASK) >> 1;
237 int err_code;
238 uint32_t variable_header_length;
239
240 /* Read topic length field. */
241 err_code = mqtt_read_message_chunk(client, buf, sizeof(uint16_t));
242 if (err_code < 0) {
243 return err_code;
244 }
245
246 variable_header_length = *buf->cur << 8; /* MSB */
247 variable_header_length |= *(buf->cur + 1); /* LSB */
248
249 /* Add two bytes for topic length field. */
250 variable_header_length += sizeof(uint16_t);
251
252 /* Add two bytes for message_id, if needed. */
253 if (qos > MQTT_QOS_0_AT_MOST_ONCE) {
254 variable_header_length += sizeof(uint16_t);
255 }
256
257 if (mqtt_is_version_5_0(client)) {
258 struct buf_ctx backup;
259 uint8_t var_len = 1;
260 uint32_t prop_len = 0;
261
262 while (true) {
263 err_code = mqtt_read_message_chunk(
264 client, buf, variable_header_length + var_len);
265 if (err_code < 0) {
266 return err_code;
267 }
268
269 backup = *buf;
270 buf->cur += variable_header_length;
271
272 /* Try to decode variable integer, in case integer is
273 * not complete, read more bytes from the stream and retry.
274 */
275 err_code = unpack_variable_int(buf, &prop_len);
276 if (err_code >= 0) {
277 break;
278 }
279
280 if (err_code != -EAGAIN) {
281 return err_code;
282 }
283
284 /* Try again. */
285 var_len++;
286 *buf = backup;
287 }
288
289 *buf = backup;
290 variable_header_length += var_len + prop_len;
291 }
292
293 err_code = mqtt_read_message_chunk(client, buf,
294 variable_header_length);
295 if (err_code < 0) {
296 return err_code;
297 }
298
299 return 0;
300 }
301
mqtt_read_and_parse_fixed_header(struct mqtt_client * client,uint8_t * type_and_flags,uint32_t * var_length,struct buf_ctx * buf)302 static int mqtt_read_and_parse_fixed_header(struct mqtt_client *client,
303 uint8_t *type_and_flags,
304 uint32_t *var_length,
305 struct buf_ctx *buf)
306 {
307 /* Read the mandatory part of the fixed header in first iteration. */
308 uint8_t chunk_size = MQTT_FIXED_HEADER_MIN_SIZE;
309 int err_code;
310
311 do {
312 err_code = mqtt_read_message_chunk(client, buf, chunk_size);
313 if (err_code < 0) {
314 return err_code;
315 }
316
317 /* Reset to pointer to the beginning of the frame. */
318 buf->cur = client->rx_buf;
319 chunk_size = 1U;
320
321 err_code = fixed_header_decode(buf, type_and_flags, var_length);
322 } while (err_code == -EAGAIN);
323
324 return err_code;
325 }
326
mqtt_handle_rx(struct mqtt_client * client)327 int mqtt_handle_rx(struct mqtt_client *client)
328 {
329 int err_code;
330 uint8_t type_and_flags;
331 uint32_t var_length;
332 struct buf_ctx buf;
333
334 buf.cur = client->rx_buf;
335 buf.end = client->rx_buf + client->internal.rx_buf_datalen;
336
337 err_code = mqtt_read_and_parse_fixed_header(client, &type_and_flags,
338 &var_length, &buf);
339 if (err_code < 0) {
340 return (err_code == -EAGAIN) ? 0 : err_code;
341 }
342
343 if ((type_and_flags & 0xF0) == MQTT_PKT_TYPE_PUBLISH) {
344 err_code = mqtt_read_publish_var_header(client, type_and_flags,
345 &buf);
346 } else {
347 err_code = mqtt_read_message_chunk(client, &buf, var_length);
348 }
349
350 if (err_code < 0) {
351 return (err_code == -EAGAIN) ? 0 : err_code;
352 }
353
354 /* At this point, packet is ready to be passed to the application. */
355 err_code = mqtt_handle_packet(client, type_and_flags, var_length, &buf);
356 if (err_code < 0) {
357 return err_code;
358 }
359
360 client->internal.rx_buf_datalen = 0U;
361
362 return 0;
363 }
364