1 /*
2 * Copyright (c) 2017 Intel Corporation
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 #include <zephyr/logging/log.h>
8 LOG_MODULE_REGISTER(net_mqtt_publisher_sample, LOG_LEVEL_DBG);
9
10 #include <zephyr/kernel.h>
11 #include <zephyr/net/socket.h>
12 #include <zephyr/net/mqtt.h>
13 #include <zephyr/random/random.h>
14 #if defined(CONFIG_LOG_BACKEND_MQTT)
15 #include <zephyr/logging/log_backend_mqtt.h>
16 #endif
17
18 #include <string.h>
19 #include <errno.h>
20
21 #include "config.h"
22 #include "net_sample_common.h"
23
24 #if defined(CONFIG_USERSPACE)
25 #include <zephyr/app_memory/app_memdomain.h>
26 K_APPMEM_PARTITION_DEFINE(app_partition);
27 struct k_mem_domain app_domain;
28 #define APP_BMEM K_APP_BMEM(app_partition)
29 #define APP_DMEM K_APP_DMEM(app_partition)
30 #else
31 #define APP_BMEM
32 #define APP_DMEM
33 #endif
34
35 /* Buffers for MQTT client. */
36 static APP_BMEM uint8_t rx_buffer[APP_MQTT_BUFFER_SIZE];
37 static APP_BMEM uint8_t tx_buffer[APP_MQTT_BUFFER_SIZE];
38
39 #if defined(CONFIG_MQTT_LIB_WEBSOCKET)
40 /* Making RX buffer large enough that the full IPv6 packet can fit into it */
41 #define MQTT_LIB_WEBSOCKET_RECV_BUF_LEN 1280
42
43 /* Websocket needs temporary buffer to store partial packets */
44 static APP_BMEM uint8_t temp_ws_rx_buf[MQTT_LIB_WEBSOCKET_RECV_BUF_LEN];
45 #endif
46
47 /* The mqtt client struct */
48 static APP_BMEM struct mqtt_client client_ctx;
49
50 /* MQTT Broker details. */
51 static APP_BMEM struct sockaddr_storage broker;
52
53 #if defined(CONFIG_SOCKS)
54 static APP_BMEM struct sockaddr socks5_proxy;
55 #endif
56
57 static APP_BMEM struct pollfd fds[1];
58 static APP_BMEM int nfds;
59 static APP_BMEM bool connected;
60
61 /* Whether to include full topic in the publish message, or alias only (MQTT 5). */
62 static APP_BMEM bool include_topic;
63 static APP_BMEM bool aliases_enabled;
64
65 #define APP_TOPIC_ALIAS 1
66
67 #if defined(CONFIG_MQTT_LIB_TLS)
68
69 #include "test_certs.h"
70
71 #define TLS_SNI_HOSTNAME "localhost"
72 #define APP_CA_CERT_TAG 1
73 #define APP_PSK_TAG 2
74
75 static APP_DMEM sec_tag_t m_sec_tags[] = {
76 #if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
77 APP_CA_CERT_TAG,
78 #endif
79 #if defined(MBEDTLS_KEY_EXCHANGE_SOME_PSK_ENABLED)
80 APP_PSK_TAG,
81 #endif
82 };
83
tls_init(void)84 static int tls_init(void)
85 {
86 int err = -EINVAL;
87
88 #if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
89 err = tls_credential_add(APP_CA_CERT_TAG, TLS_CREDENTIAL_CA_CERTIFICATE,
90 ca_certificate, sizeof(ca_certificate));
91 if (err < 0) {
92 LOG_ERR("Failed to register public certificate: %d", err);
93 return err;
94 }
95 #endif
96
97 #if defined(MBEDTLS_KEY_EXCHANGE_SOME_PSK_ENABLED)
98 err = tls_credential_add(APP_PSK_TAG, TLS_CREDENTIAL_PSK,
99 client_psk, sizeof(client_psk));
100 if (err < 0) {
101 LOG_ERR("Failed to register PSK: %d", err);
102 return err;
103 }
104
105 err = tls_credential_add(APP_PSK_TAG, TLS_CREDENTIAL_PSK_ID,
106 client_psk_id, sizeof(client_psk_id) - 1);
107 if (err < 0) {
108 LOG_ERR("Failed to register PSK ID: %d", err);
109 }
110 #endif
111
112 return err;
113 }
114
115 #endif /* CONFIG_MQTT_LIB_TLS */
116
prepare_fds(struct mqtt_client * client)117 static void prepare_fds(struct mqtt_client *client)
118 {
119 if (client->transport.type == MQTT_TRANSPORT_NON_SECURE) {
120 fds[0].fd = client->transport.tcp.sock;
121 }
122 #if defined(CONFIG_MQTT_LIB_TLS)
123 else if (client->transport.type == MQTT_TRANSPORT_SECURE) {
124 fds[0].fd = client->transport.tls.sock;
125 }
126 #endif
127
128 fds[0].events = POLLIN;
129 nfds = 1;
130 }
131
clear_fds(void)132 static void clear_fds(void)
133 {
134 nfds = 0;
135 }
136
wait(int timeout)137 static int wait(int timeout)
138 {
139 int ret = 0;
140
141 if (nfds > 0) {
142 ret = poll(fds, nfds, timeout);
143 if (ret < 0) {
144 LOG_ERR("poll error: %d", errno);
145 }
146 }
147
148 return ret;
149 }
150
mqtt_evt_handler(struct mqtt_client * const client,const struct mqtt_evt * evt)151 void mqtt_evt_handler(struct mqtt_client *const client,
152 const struct mqtt_evt *evt)
153 {
154 int err;
155
156 switch (evt->type) {
157 case MQTT_EVT_CONNACK:
158 if (evt->result != 0) {
159 LOG_ERR("MQTT connect failed %d", evt->result);
160 break;
161 }
162
163 connected = true;
164 LOG_INF("MQTT client connected!");
165
166 #if defined(CONFIG_MQTT_VERSION_5_0)
167 if (evt->param.connack.prop.rx.has_topic_alias_maximum &&
168 evt->param.connack.prop.topic_alias_maximum > 0) {
169 LOG_INF("Topic aliases allowed by the broker, max %u.",
170 evt->param.connack.prop.topic_alias_maximum);
171
172 aliases_enabled = true;
173 } else {
174 LOG_INF("Topic aliases disallowed by the broker.");
175 }
176 #endif
177
178 #if defined(CONFIG_LOG_BACKEND_MQTT)
179 log_backend_mqtt_client_set(client);
180 #endif
181
182 break;
183
184 case MQTT_EVT_DISCONNECT:
185 LOG_INF("MQTT client disconnected %d", evt->result);
186
187 connected = false;
188 clear_fds();
189
190 #if defined(CONFIG_LOG_BACKEND_MQTT)
191 log_backend_mqtt_client_set(NULL);
192 #endif
193
194 break;
195
196 case MQTT_EVT_PUBACK:
197 if (evt->result != 0) {
198 LOG_ERR("MQTT PUBACK error %d", evt->result);
199 break;
200 }
201
202 LOG_INF("PUBACK packet id: %u", evt->param.puback.message_id);
203
204 break;
205
206 case MQTT_EVT_PUBREC:
207 if (evt->result != 0) {
208 LOG_ERR("MQTT PUBREC error %d", evt->result);
209 break;
210 }
211
212 LOG_INF("PUBREC packet id: %u", evt->param.pubrec.message_id);
213
214 const struct mqtt_pubrel_param rel_param = {
215 .message_id = evt->param.pubrec.message_id
216 };
217
218 err = mqtt_publish_qos2_release(client, &rel_param);
219 if (err != 0) {
220 LOG_ERR("Failed to send MQTT PUBREL: %d", err);
221 }
222
223 break;
224
225 case MQTT_EVT_PUBCOMP:
226 if (evt->result != 0) {
227 LOG_ERR("MQTT PUBCOMP error %d", evt->result);
228 break;
229 }
230
231 LOG_INF("PUBCOMP packet id: %u",
232 evt->param.pubcomp.message_id);
233
234 break;
235
236 case MQTT_EVT_PINGRESP:
237 LOG_INF("PINGRESP packet");
238 break;
239
240 default:
241 break;
242 }
243 }
244
get_mqtt_payload(enum mqtt_qos qos)245 static char *get_mqtt_payload(enum mqtt_qos qos)
246 {
247 #if APP_BLUEMIX_TOPIC
248 static APP_BMEM char payload[30];
249
250 snprintk(payload, sizeof(payload), "{d:{temperature:%d}}",
251 sys_rand8_get());
252 #else
253 static APP_DMEM char payload[] = "DOORS:OPEN_QoSx";
254
255 payload[strlen(payload) - 1] = '0' + qos;
256 #endif
257
258 return payload;
259 }
260
get_mqtt_topic(void)261 static char *get_mqtt_topic(void)
262 {
263 #if APP_BLUEMIX_TOPIC
264 return "iot-2/type/"BLUEMIX_DEVTYPE"/id/"BLUEMIX_DEVID
265 "/evt/"BLUEMIX_EVENT"/fmt/"BLUEMIX_FORMAT;
266 #else
267 return "sensors";
268 #endif
269 }
270
publish(struct mqtt_client * client,enum mqtt_qos qos)271 static int publish(struct mqtt_client *client, enum mqtt_qos qos)
272 {
273 struct mqtt_publish_param param = { 0 };
274
275 /* Always true for MQTT 3.1.1.
276 * True only on first publish message for MQTT 5.0 if broker allows aliases.
277 */
278 if (include_topic) {
279 param.message.topic.topic.utf8 = (uint8_t *)get_mqtt_topic();
280 param.message.topic.topic.size =
281 strlen(param.message.topic.topic.utf8);
282 }
283
284 param.message.topic.qos = qos;
285 param.message.payload.data = get_mqtt_payload(qos);
286 param.message.payload.len =
287 strlen(param.message.payload.data);
288 param.message_id = sys_rand16_get();
289 param.dup_flag = 0U;
290 param.retain_flag = 0U;
291
292 #if defined(CONFIG_MQTT_VERSION_5_0)
293 if (aliases_enabled) {
294 param.prop.topic_alias = APP_TOPIC_ALIAS;
295 include_topic = false;
296 }
297 #endif
298
299 return mqtt_publish(client, ¶m);
300 }
301
302 #define RC_STR(rc) ((rc) == 0 ? "OK" : "ERROR")
303
304 #define PRINT_RESULT(func, rc) \
305 LOG_INF("%s: %d <%s>", (func), rc, RC_STR(rc))
306
broker_init(void)307 static void broker_init(void)
308 {
309 #if defined(CONFIG_NET_IPV6)
310 struct sockaddr_in6 *broker6 = (struct sockaddr_in6 *)&broker;
311
312 broker6->sin6_family = AF_INET6;
313 broker6->sin6_port = htons(SERVER_PORT);
314 inet_pton(AF_INET6, SERVER_ADDR, &broker6->sin6_addr);
315
316 #if defined(CONFIG_SOCKS)
317 struct sockaddr_in6 *proxy6 = (struct sockaddr_in6 *)&socks5_proxy;
318
319 proxy6->sin6_family = AF_INET6;
320 proxy6->sin6_port = htons(SOCKS5_PROXY_PORT);
321 inet_pton(AF_INET6, SOCKS5_PROXY_ADDR, &proxy6->sin6_addr);
322 #endif
323 #else
324 struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker;
325
326 broker4->sin_family = AF_INET;
327 broker4->sin_port = htons(SERVER_PORT);
328 inet_pton(AF_INET, SERVER_ADDR, &broker4->sin_addr);
329 #if defined(CONFIG_SOCKS)
330 struct sockaddr_in *proxy4 = (struct sockaddr_in *)&socks5_proxy;
331
332 proxy4->sin_family = AF_INET;
333 proxy4->sin_port = htons(SOCKS5_PROXY_PORT);
334 inet_pton(AF_INET, SOCKS5_PROXY_ADDR, &proxy4->sin_addr);
335 #endif
336 #endif
337 }
338
client_init(struct mqtt_client * client)339 static void client_init(struct mqtt_client *client)
340 {
341 mqtt_client_init(client);
342
343 broker_init();
344
345 /* MQTT client configuration */
346 client->broker = &broker;
347 client->evt_cb = mqtt_evt_handler;
348 client->client_id.utf8 = (uint8_t *)MQTT_CLIENTID;
349 client->client_id.size = strlen(MQTT_CLIENTID);
350 client->password = NULL;
351 client->user_name = NULL;
352 #if defined(CONFIG_MQTT_VERSION_5_0)
353 client->protocol_version = MQTT_VERSION_5_0;
354 #else
355 client->protocol_version = MQTT_VERSION_3_1_1;
356 #endif
357
358 /* MQTT buffers configuration */
359 client->rx_buf = rx_buffer;
360 client->rx_buf_size = sizeof(rx_buffer);
361 client->tx_buf = tx_buffer;
362 client->tx_buf_size = sizeof(tx_buffer);
363
364 /* MQTT transport configuration */
365 #if defined(CONFIG_MQTT_LIB_TLS)
366 #if defined(CONFIG_MQTT_LIB_WEBSOCKET)
367 client->transport.type = MQTT_TRANSPORT_SECURE_WEBSOCKET;
368 #else
369 client->transport.type = MQTT_TRANSPORT_SECURE;
370 #endif
371
372 struct mqtt_sec_config *tls_config = &client->transport.tls.config;
373
374 tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED;
375 tls_config->cipher_list = NULL;
376 tls_config->sec_tag_list = m_sec_tags;
377 tls_config->sec_tag_count = ARRAY_SIZE(m_sec_tags);
378 #if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
379 tls_config->hostname = TLS_SNI_HOSTNAME;
380 #else
381 tls_config->hostname = NULL;
382 #endif
383
384 #else
385 #if defined(CONFIG_MQTT_LIB_WEBSOCKET)
386 client->transport.type = MQTT_TRANSPORT_NON_SECURE_WEBSOCKET;
387 #else
388 client->transport.type = MQTT_TRANSPORT_NON_SECURE;
389 #endif
390 #endif
391
392 #if defined(CONFIG_MQTT_LIB_WEBSOCKET)
393 client->transport.websocket.config.host = SERVER_ADDR;
394 client->transport.websocket.config.url = "/mqtt";
395 client->transport.websocket.config.tmp_buf = temp_ws_rx_buf;
396 client->transport.websocket.config.tmp_buf_len =
397 sizeof(temp_ws_rx_buf);
398 client->transport.websocket.timeout = 5 * MSEC_PER_SEC;
399 #endif
400
401 #if defined(CONFIG_SOCKS)
402 mqtt_client_set_proxy(client, &socks5_proxy,
403 socks5_proxy.sa_family == AF_INET ?
404 sizeof(struct sockaddr_in) :
405 sizeof(struct sockaddr_in6));
406 #endif
407 }
408
409 /* In this routine we block until the connected variable is 1 */
try_to_connect(struct mqtt_client * client)410 static int try_to_connect(struct mqtt_client *client)
411 {
412 int rc, i = 0;
413
414 while (i++ < APP_CONNECT_TRIES && !connected) {
415
416 client_init(client);
417
418 rc = mqtt_connect(client);
419 if (rc != 0) {
420 PRINT_RESULT("mqtt_connect", rc);
421 k_sleep(K_MSEC(APP_SLEEP_MSECS));
422 continue;
423 }
424
425 prepare_fds(client);
426
427 if (wait(APP_CONNECT_TIMEOUT_MS)) {
428 mqtt_input(client);
429 }
430
431 if (!connected) {
432 mqtt_abort(client);
433 }
434 }
435
436 if (connected) {
437 return 0;
438 }
439
440 return -EINVAL;
441 }
442
process_mqtt_and_sleep(struct mqtt_client * client,int timeout)443 static int process_mqtt_and_sleep(struct mqtt_client *client, int timeout)
444 {
445 int64_t remaining = timeout;
446 int64_t start_time = k_uptime_get();
447 int rc;
448
449 while (remaining > 0 && connected) {
450 if (wait(remaining)) {
451 rc = mqtt_input(client);
452 if (rc != 0) {
453 PRINT_RESULT("mqtt_input", rc);
454 return rc;
455 }
456 }
457
458 rc = mqtt_live(client);
459 if (rc != 0 && rc != -EAGAIN) {
460 PRINT_RESULT("mqtt_live", rc);
461 return rc;
462 } else if (rc == 0) {
463 rc = mqtt_input(client);
464 if (rc != 0) {
465 PRINT_RESULT("mqtt_input", rc);
466 return rc;
467 }
468 }
469
470 remaining = timeout + start_time - k_uptime_get();
471 }
472
473 return 0;
474 }
475
476 #define SUCCESS_OR_EXIT(rc) { if (rc != 0) { return 1; } }
477 #define SUCCESS_OR_BREAK(rc) { if (rc != 0) { break; } }
478
publisher(void)479 static int publisher(void)
480 {
481 int i, rc, r = 0;
482
483 include_topic = true;
484 aliases_enabled = false;
485
486 LOG_INF("attempting to connect: ");
487 rc = try_to_connect(&client_ctx);
488 PRINT_RESULT("try_to_connect", rc);
489 SUCCESS_OR_EXIT(rc);
490
491 i = 0;
492 while (i++ < CONFIG_NET_SAMPLE_APP_MAX_ITERATIONS && connected) {
493 r = -1;
494
495 rc = mqtt_ping(&client_ctx);
496 PRINT_RESULT("mqtt_ping", rc);
497 SUCCESS_OR_BREAK(rc);
498
499 rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
500 SUCCESS_OR_BREAK(rc);
501
502 rc = publish(&client_ctx, MQTT_QOS_0_AT_MOST_ONCE);
503 PRINT_RESULT("mqtt_publish", rc);
504 SUCCESS_OR_BREAK(rc);
505
506 rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
507 SUCCESS_OR_BREAK(rc);
508
509 rc = publish(&client_ctx, MQTT_QOS_1_AT_LEAST_ONCE);
510 PRINT_RESULT("mqtt_publish", rc);
511 SUCCESS_OR_BREAK(rc);
512
513 rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
514 SUCCESS_OR_BREAK(rc);
515
516 rc = publish(&client_ctx, MQTT_QOS_2_EXACTLY_ONCE);
517 PRINT_RESULT("mqtt_publish", rc);
518 SUCCESS_OR_BREAK(rc);
519
520 rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
521 SUCCESS_OR_BREAK(rc);
522
523 r = 0;
524 }
525
526 rc = mqtt_disconnect(&client_ctx, NULL);
527 PRINT_RESULT("mqtt_disconnect", rc);
528
529 LOG_INF("Bye!");
530
531 return r;
532 }
533
start_app(void)534 static int start_app(void)
535 {
536 int r = 0, i = 0;
537
538 while (!CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS ||
539 i++ < CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS) {
540 r = publisher();
541
542 if (!CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS) {
543 k_sleep(K_MSEC(5000));
544 }
545 }
546
547 return r;
548 }
549
550 #if defined(CONFIG_USERSPACE)
551 #define STACK_SIZE 2048
552
553 #if defined(CONFIG_NET_TC_THREAD_COOPERATIVE)
554 #define THREAD_PRIORITY K_PRIO_COOP(CONFIG_NUM_COOP_PRIORITIES - 1)
555 #else
556 #define THREAD_PRIORITY K_PRIO_PREEMPT(8)
557 #endif
558
559 K_THREAD_DEFINE(app_thread, STACK_SIZE,
560 start_app, NULL, NULL, NULL,
561 THREAD_PRIORITY, K_USER, -1);
562
563 static K_HEAP_DEFINE(app_mem_pool, 1024 * 2);
564 #endif
565
main(void)566 int main(void)
567 {
568 wait_for_network();
569
570 #if defined(CONFIG_MQTT_LIB_TLS)
571 int rc;
572
573 rc = tls_init();
574 PRINT_RESULT("tls_init", rc);
575 #endif
576
577 #if defined(CONFIG_USERSPACE)
578 int ret;
579
580 struct k_mem_partition *parts[] = {
581 #if Z_LIBC_PARTITION_EXISTS
582 &z_libc_partition,
583 #endif
584 &app_partition
585 };
586
587 ret = k_mem_domain_init(&app_domain, ARRAY_SIZE(parts), parts);
588 __ASSERT(ret == 0, "k_mem_domain_init() failed %d", ret);
589 ARG_UNUSED(ret);
590
591 k_mem_domain_add_thread(&app_domain, app_thread);
592 k_thread_heap_assign(app_thread, &app_mem_pool);
593
594 k_thread_start(app_thread);
595 k_thread_join(app_thread, K_FOREVER);
596 #else
597 exit(start_app());
598 #endif
599 return 0;
600 }
601