1 /*
2  * Copyright (c) 2017 Intel Corporation
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <zephyr/logging/log.h>
8 LOG_MODULE_REGISTER(net_mqtt_publisher_sample, LOG_LEVEL_DBG);
9 
10 #include <zephyr/kernel.h>
11 #include <zephyr/net/socket.h>
12 #include <zephyr/net/mqtt.h>
13 #include <zephyr/random/random.h>
14 #if defined(CONFIG_LOG_BACKEND_MQTT)
15 #include <zephyr/logging/log_backend_mqtt.h>
16 #endif
17 
18 #include <string.h>
19 #include <errno.h>
20 
21 #include "config.h"
22 #include "net_sample_common.h"
23 
24 #if defined(CONFIG_USERSPACE)
25 #include <zephyr/app_memory/app_memdomain.h>
26 K_APPMEM_PARTITION_DEFINE(app_partition);
27 struct k_mem_domain app_domain;
28 #define APP_BMEM K_APP_BMEM(app_partition)
29 #define APP_DMEM K_APP_DMEM(app_partition)
30 #else
31 #define APP_BMEM
32 #define APP_DMEM
33 #endif
34 
35 /* Buffers for MQTT client. */
36 static APP_BMEM uint8_t rx_buffer[APP_MQTT_BUFFER_SIZE];
37 static APP_BMEM uint8_t tx_buffer[APP_MQTT_BUFFER_SIZE];
38 
39 #if defined(CONFIG_MQTT_LIB_WEBSOCKET)
40 /* Making RX buffer large enough that the full IPv6 packet can fit into it */
41 #define MQTT_LIB_WEBSOCKET_RECV_BUF_LEN 1280
42 
43 /* Websocket needs temporary buffer to store partial packets */
44 static APP_BMEM uint8_t temp_ws_rx_buf[MQTT_LIB_WEBSOCKET_RECV_BUF_LEN];
45 #endif
46 
47 /* The mqtt client struct */
48 static APP_BMEM struct mqtt_client client_ctx;
49 
50 /* MQTT Broker details. */
51 static APP_BMEM struct sockaddr_storage broker;
52 
53 #if defined(CONFIG_SOCKS)
54 static APP_BMEM struct sockaddr socks5_proxy;
55 #endif
56 
57 static APP_BMEM struct pollfd fds[1];
58 static APP_BMEM int nfds;
59 static APP_BMEM bool connected;
60 
61 /* Whether to include full topic in the publish message, or alias only (MQTT 5). */
62 static APP_BMEM bool include_topic;
63 static APP_BMEM bool aliases_enabled;
64 
65 #define APP_TOPIC_ALIAS 1
66 
67 #if defined(CONFIG_MQTT_LIB_TLS)
68 
69 #include "test_certs.h"
70 
71 #define TLS_SNI_HOSTNAME "localhost"
72 #define APP_CA_CERT_TAG 1
73 #define APP_PSK_TAG 2
74 
75 static APP_DMEM sec_tag_t m_sec_tags[] = {
76 #if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
77 		APP_CA_CERT_TAG,
78 #endif
79 #if defined(MBEDTLS_KEY_EXCHANGE_SOME_PSK_ENABLED)
80 		APP_PSK_TAG,
81 #endif
82 };
83 
tls_init(void)84 static int tls_init(void)
85 {
86 	int err = -EINVAL;
87 
88 #if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
89 	err = tls_credential_add(APP_CA_CERT_TAG, TLS_CREDENTIAL_CA_CERTIFICATE,
90 				 ca_certificate, sizeof(ca_certificate));
91 	if (err < 0) {
92 		LOG_ERR("Failed to register public certificate: %d", err);
93 		return err;
94 	}
95 #endif
96 
97 #if defined(MBEDTLS_KEY_EXCHANGE_SOME_PSK_ENABLED)
98 	err = tls_credential_add(APP_PSK_TAG, TLS_CREDENTIAL_PSK,
99 				 client_psk, sizeof(client_psk));
100 	if (err < 0) {
101 		LOG_ERR("Failed to register PSK: %d", err);
102 		return err;
103 	}
104 
105 	err = tls_credential_add(APP_PSK_TAG, TLS_CREDENTIAL_PSK_ID,
106 				 client_psk_id, sizeof(client_psk_id) - 1);
107 	if (err < 0) {
108 		LOG_ERR("Failed to register PSK ID: %d", err);
109 	}
110 #endif
111 
112 	return err;
113 }
114 
115 #endif /* CONFIG_MQTT_LIB_TLS */
116 
prepare_fds(struct mqtt_client * client)117 static void prepare_fds(struct mqtt_client *client)
118 {
119 	if (client->transport.type == MQTT_TRANSPORT_NON_SECURE) {
120 		fds[0].fd = client->transport.tcp.sock;
121 	}
122 #if defined(CONFIG_MQTT_LIB_TLS)
123 	else if (client->transport.type == MQTT_TRANSPORT_SECURE) {
124 		fds[0].fd = client->transport.tls.sock;
125 	}
126 #endif
127 
128 	fds[0].events = POLLIN;
129 	nfds = 1;
130 }
131 
clear_fds(void)132 static void clear_fds(void)
133 {
134 	nfds = 0;
135 }
136 
wait(int timeout)137 static int wait(int timeout)
138 {
139 	int ret = 0;
140 
141 	if (nfds > 0) {
142 		ret = poll(fds, nfds, timeout);
143 		if (ret < 0) {
144 			LOG_ERR("poll error: %d", errno);
145 		}
146 	}
147 
148 	return ret;
149 }
150 
mqtt_evt_handler(struct mqtt_client * const client,const struct mqtt_evt * evt)151 void mqtt_evt_handler(struct mqtt_client *const client,
152 		      const struct mqtt_evt *evt)
153 {
154 	int err;
155 
156 	switch (evt->type) {
157 	case MQTT_EVT_CONNACK:
158 		if (evt->result != 0) {
159 			LOG_ERR("MQTT connect failed %d", evt->result);
160 			break;
161 		}
162 
163 		connected = true;
164 		LOG_INF("MQTT client connected!");
165 
166 #if defined(CONFIG_MQTT_VERSION_5_0)
167 		if (evt->param.connack.prop.rx.has_topic_alias_maximum &&
168 		    evt->param.connack.prop.topic_alias_maximum > 0) {
169 			LOG_INF("Topic aliases allowed by the broker, max %u.",
170 				evt->param.connack.prop.topic_alias_maximum);
171 
172 			aliases_enabled = true;
173 		} else {
174 			LOG_INF("Topic aliases disallowed by the broker.");
175 		}
176 #endif
177 
178 #if defined(CONFIG_LOG_BACKEND_MQTT)
179 		log_backend_mqtt_client_set(client);
180 #endif
181 
182 		break;
183 
184 	case MQTT_EVT_DISCONNECT:
185 		LOG_INF("MQTT client disconnected %d", evt->result);
186 
187 		connected = false;
188 		clear_fds();
189 
190 #if defined(CONFIG_LOG_BACKEND_MQTT)
191 		log_backend_mqtt_client_set(NULL);
192 #endif
193 
194 		break;
195 
196 	case MQTT_EVT_PUBACK:
197 		if (evt->result != 0) {
198 			LOG_ERR("MQTT PUBACK error %d", evt->result);
199 			break;
200 		}
201 
202 		LOG_INF("PUBACK packet id: %u", evt->param.puback.message_id);
203 
204 		break;
205 
206 	case MQTT_EVT_PUBREC:
207 		if (evt->result != 0) {
208 			LOG_ERR("MQTT PUBREC error %d", evt->result);
209 			break;
210 		}
211 
212 		LOG_INF("PUBREC packet id: %u", evt->param.pubrec.message_id);
213 
214 		const struct mqtt_pubrel_param rel_param = {
215 			.message_id = evt->param.pubrec.message_id
216 		};
217 
218 		err = mqtt_publish_qos2_release(client, &rel_param);
219 		if (err != 0) {
220 			LOG_ERR("Failed to send MQTT PUBREL: %d", err);
221 		}
222 
223 		break;
224 
225 	case MQTT_EVT_PUBCOMP:
226 		if (evt->result != 0) {
227 			LOG_ERR("MQTT PUBCOMP error %d", evt->result);
228 			break;
229 		}
230 
231 		LOG_INF("PUBCOMP packet id: %u",
232 			evt->param.pubcomp.message_id);
233 
234 		break;
235 
236 	case MQTT_EVT_PINGRESP:
237 		LOG_INF("PINGRESP packet");
238 		break;
239 
240 	default:
241 		break;
242 	}
243 }
244 
get_mqtt_payload(enum mqtt_qos qos)245 static char *get_mqtt_payload(enum mqtt_qos qos)
246 {
247 #if APP_BLUEMIX_TOPIC
248 	static APP_BMEM char payload[30];
249 
250 	snprintk(payload, sizeof(payload), "{d:{temperature:%d}}",
251 		 sys_rand8_get());
252 #else
253 	static APP_DMEM char payload[] = "DOORS:OPEN_QoSx";
254 
255 	payload[strlen(payload) - 1] = '0' + qos;
256 #endif
257 
258 	return payload;
259 }
260 
get_mqtt_topic(void)261 static char *get_mqtt_topic(void)
262 {
263 #if APP_BLUEMIX_TOPIC
264 	return "iot-2/type/"BLUEMIX_DEVTYPE"/id/"BLUEMIX_DEVID
265 	       "/evt/"BLUEMIX_EVENT"/fmt/"BLUEMIX_FORMAT;
266 #else
267 	return "sensors";
268 #endif
269 }
270 
publish(struct mqtt_client * client,enum mqtt_qos qos)271 static int publish(struct mqtt_client *client, enum mqtt_qos qos)
272 {
273 	struct mqtt_publish_param param = { 0 };
274 
275 	/* Always true for MQTT 3.1.1.
276 	 * True only on first publish message for MQTT 5.0 if broker allows aliases.
277 	 */
278 	if (include_topic) {
279 		param.message.topic.topic.utf8 = (uint8_t *)get_mqtt_topic();
280 		param.message.topic.topic.size =
281 			strlen(param.message.topic.topic.utf8);
282 	}
283 
284 	param.message.topic.qos = qos;
285 	param.message.payload.data = get_mqtt_payload(qos);
286 	param.message.payload.len =
287 			strlen(param.message.payload.data);
288 	param.message_id = sys_rand16_get();
289 	param.dup_flag = 0U;
290 	param.retain_flag = 0U;
291 
292 #if defined(CONFIG_MQTT_VERSION_5_0)
293 	if (aliases_enabled) {
294 		param.prop.topic_alias = APP_TOPIC_ALIAS;
295 		include_topic = false;
296 	}
297 #endif
298 
299 	return mqtt_publish(client, &param);
300 }
301 
302 #define RC_STR(rc) ((rc) == 0 ? "OK" : "ERROR")
303 
304 #define PRINT_RESULT(func, rc) \
305 	LOG_INF("%s: %d <%s>", (func), rc, RC_STR(rc))
306 
broker_init(void)307 static void broker_init(void)
308 {
309 #if defined(CONFIG_NET_IPV6)
310 	struct sockaddr_in6 *broker6 = (struct sockaddr_in6 *)&broker;
311 
312 	broker6->sin6_family = AF_INET6;
313 	broker6->sin6_port = htons(SERVER_PORT);
314 	inet_pton(AF_INET6, SERVER_ADDR, &broker6->sin6_addr);
315 
316 #if defined(CONFIG_SOCKS)
317 	struct sockaddr_in6 *proxy6 = (struct sockaddr_in6 *)&socks5_proxy;
318 
319 	proxy6->sin6_family = AF_INET6;
320 	proxy6->sin6_port = htons(SOCKS5_PROXY_PORT);
321 	inet_pton(AF_INET6, SOCKS5_PROXY_ADDR, &proxy6->sin6_addr);
322 #endif
323 #else
324 	struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker;
325 
326 	broker4->sin_family = AF_INET;
327 	broker4->sin_port = htons(SERVER_PORT);
328 	inet_pton(AF_INET, SERVER_ADDR, &broker4->sin_addr);
329 #if defined(CONFIG_SOCKS)
330 	struct sockaddr_in *proxy4 = (struct sockaddr_in *)&socks5_proxy;
331 
332 	proxy4->sin_family = AF_INET;
333 	proxy4->sin_port = htons(SOCKS5_PROXY_PORT);
334 	inet_pton(AF_INET, SOCKS5_PROXY_ADDR, &proxy4->sin_addr);
335 #endif
336 #endif
337 }
338 
client_init(struct mqtt_client * client)339 static void client_init(struct mqtt_client *client)
340 {
341 	mqtt_client_init(client);
342 
343 	broker_init();
344 
345 	/* MQTT client configuration */
346 	client->broker = &broker;
347 	client->evt_cb = mqtt_evt_handler;
348 	client->client_id.utf8 = (uint8_t *)MQTT_CLIENTID;
349 	client->client_id.size = strlen(MQTT_CLIENTID);
350 	client->password = NULL;
351 	client->user_name = NULL;
352 #if defined(CONFIG_MQTT_VERSION_5_0)
353 	client->protocol_version = MQTT_VERSION_5_0;
354 #else
355 	client->protocol_version = MQTT_VERSION_3_1_1;
356 #endif
357 
358 	/* MQTT buffers configuration */
359 	client->rx_buf = rx_buffer;
360 	client->rx_buf_size = sizeof(rx_buffer);
361 	client->tx_buf = tx_buffer;
362 	client->tx_buf_size = sizeof(tx_buffer);
363 
364 	/* MQTT transport configuration */
365 #if defined(CONFIG_MQTT_LIB_TLS)
366 #if defined(CONFIG_MQTT_LIB_WEBSOCKET)
367 	client->transport.type = MQTT_TRANSPORT_SECURE_WEBSOCKET;
368 #else
369 	client->transport.type = MQTT_TRANSPORT_SECURE;
370 #endif
371 
372 	struct mqtt_sec_config *tls_config = &client->transport.tls.config;
373 
374 	tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED;
375 	tls_config->cipher_list = NULL;
376 	tls_config->sec_tag_list = m_sec_tags;
377 	tls_config->sec_tag_count = ARRAY_SIZE(m_sec_tags);
378 #if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
379 	tls_config->hostname = TLS_SNI_HOSTNAME;
380 #else
381 	tls_config->hostname = NULL;
382 #endif
383 
384 #else
385 #if defined(CONFIG_MQTT_LIB_WEBSOCKET)
386 	client->transport.type = MQTT_TRANSPORT_NON_SECURE_WEBSOCKET;
387 #else
388 	client->transport.type = MQTT_TRANSPORT_NON_SECURE;
389 #endif
390 #endif
391 
392 #if defined(CONFIG_MQTT_LIB_WEBSOCKET)
393 	client->transport.websocket.config.host = SERVER_ADDR;
394 	client->transport.websocket.config.url = "/mqtt";
395 	client->transport.websocket.config.tmp_buf = temp_ws_rx_buf;
396 	client->transport.websocket.config.tmp_buf_len =
397 						sizeof(temp_ws_rx_buf);
398 	client->transport.websocket.timeout = 5 * MSEC_PER_SEC;
399 #endif
400 
401 #if defined(CONFIG_SOCKS)
402 	mqtt_client_set_proxy(client, &socks5_proxy,
403 			      socks5_proxy.sa_family == AF_INET ?
404 			      sizeof(struct sockaddr_in) :
405 			      sizeof(struct sockaddr_in6));
406 #endif
407 }
408 
409 /* In this routine we block until the connected variable is 1 */
try_to_connect(struct mqtt_client * client)410 static int try_to_connect(struct mqtt_client *client)
411 {
412 	int rc, i = 0;
413 
414 	while (i++ < APP_CONNECT_TRIES && !connected) {
415 
416 		client_init(client);
417 
418 		rc = mqtt_connect(client);
419 		if (rc != 0) {
420 			PRINT_RESULT("mqtt_connect", rc);
421 			k_sleep(K_MSEC(APP_SLEEP_MSECS));
422 			continue;
423 		}
424 
425 		prepare_fds(client);
426 
427 		if (wait(APP_CONNECT_TIMEOUT_MS)) {
428 			mqtt_input(client);
429 		}
430 
431 		if (!connected) {
432 			mqtt_abort(client);
433 		}
434 	}
435 
436 	if (connected) {
437 		return 0;
438 	}
439 
440 	return -EINVAL;
441 }
442 
process_mqtt_and_sleep(struct mqtt_client * client,int timeout)443 static int process_mqtt_and_sleep(struct mqtt_client *client, int timeout)
444 {
445 	int64_t remaining = timeout;
446 	int64_t start_time = k_uptime_get();
447 	int rc;
448 
449 	while (remaining > 0 && connected) {
450 		if (wait(remaining)) {
451 			rc = mqtt_input(client);
452 			if (rc != 0) {
453 				PRINT_RESULT("mqtt_input", rc);
454 				return rc;
455 			}
456 		}
457 
458 		rc = mqtt_live(client);
459 		if (rc != 0 && rc != -EAGAIN) {
460 			PRINT_RESULT("mqtt_live", rc);
461 			return rc;
462 		} else if (rc == 0) {
463 			rc = mqtt_input(client);
464 			if (rc != 0) {
465 				PRINT_RESULT("mqtt_input", rc);
466 				return rc;
467 			}
468 		}
469 
470 		remaining = timeout + start_time - k_uptime_get();
471 	}
472 
473 	return 0;
474 }
475 
476 #define SUCCESS_OR_EXIT(rc) { if (rc != 0) { return 1; } }
477 #define SUCCESS_OR_BREAK(rc) { if (rc != 0) { break; } }
478 
publisher(void)479 static int publisher(void)
480 {
481 	int i, rc, r = 0;
482 
483 	include_topic = true;
484 	aliases_enabled = false;
485 
486 	LOG_INF("attempting to connect: ");
487 	rc = try_to_connect(&client_ctx);
488 	PRINT_RESULT("try_to_connect", rc);
489 	SUCCESS_OR_EXIT(rc);
490 
491 	i = 0;
492 	while (i++ < CONFIG_NET_SAMPLE_APP_MAX_ITERATIONS && connected) {
493 		r = -1;
494 
495 		rc = mqtt_ping(&client_ctx);
496 		PRINT_RESULT("mqtt_ping", rc);
497 		SUCCESS_OR_BREAK(rc);
498 
499 		rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
500 		SUCCESS_OR_BREAK(rc);
501 
502 		rc = publish(&client_ctx, MQTT_QOS_0_AT_MOST_ONCE);
503 		PRINT_RESULT("mqtt_publish", rc);
504 		SUCCESS_OR_BREAK(rc);
505 
506 		rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
507 		SUCCESS_OR_BREAK(rc);
508 
509 		rc = publish(&client_ctx, MQTT_QOS_1_AT_LEAST_ONCE);
510 		PRINT_RESULT("mqtt_publish", rc);
511 		SUCCESS_OR_BREAK(rc);
512 
513 		rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
514 		SUCCESS_OR_BREAK(rc);
515 
516 		rc = publish(&client_ctx, MQTT_QOS_2_EXACTLY_ONCE);
517 		PRINT_RESULT("mqtt_publish", rc);
518 		SUCCESS_OR_BREAK(rc);
519 
520 		rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
521 		SUCCESS_OR_BREAK(rc);
522 
523 		r = 0;
524 	}
525 
526 	rc = mqtt_disconnect(&client_ctx, NULL);
527 	PRINT_RESULT("mqtt_disconnect", rc);
528 
529 	LOG_INF("Bye!");
530 
531 	return r;
532 }
533 
start_app(void)534 static int start_app(void)
535 {
536 	int r = 0, i = 0;
537 
538 	while (!CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS ||
539 	       i++ < CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS) {
540 		r = publisher();
541 
542 		if (!CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS) {
543 			k_sleep(K_MSEC(5000));
544 		}
545 	}
546 
547 	return r;
548 }
549 
550 #if defined(CONFIG_USERSPACE)
551 #define STACK_SIZE 2048
552 
553 #if defined(CONFIG_NET_TC_THREAD_COOPERATIVE)
554 #define THREAD_PRIORITY K_PRIO_COOP(CONFIG_NUM_COOP_PRIORITIES - 1)
555 #else
556 #define THREAD_PRIORITY K_PRIO_PREEMPT(8)
557 #endif
558 
559 K_THREAD_DEFINE(app_thread, STACK_SIZE,
560 		start_app, NULL, NULL, NULL,
561 		THREAD_PRIORITY, K_USER, -1);
562 
563 static K_HEAP_DEFINE(app_mem_pool, 1024 * 2);
564 #endif
565 
main(void)566 int main(void)
567 {
568 	wait_for_network();
569 
570 #if defined(CONFIG_MQTT_LIB_TLS)
571 	int rc;
572 
573 	rc = tls_init();
574 	PRINT_RESULT("tls_init", rc);
575 #endif
576 
577 #if defined(CONFIG_USERSPACE)
578 	int ret;
579 
580 	struct k_mem_partition *parts[] = {
581 #if Z_LIBC_PARTITION_EXISTS
582 		&z_libc_partition,
583 #endif
584 		&app_partition
585 	};
586 
587 	ret = k_mem_domain_init(&app_domain, ARRAY_SIZE(parts), parts);
588 	__ASSERT(ret == 0, "k_mem_domain_init() failed %d", ret);
589 	ARG_UNUSED(ret);
590 
591 	k_mem_domain_add_thread(&app_domain, app_thread);
592 	k_thread_heap_assign(app_thread, &app_mem_pool);
593 
594 	k_thread_start(app_thread);
595 	k_thread_join(app_thread, K_FOREVER);
596 #else
597 	exit(start_app());
598 #endif
599 	return 0;
600 }
601