/* * Copyright (c) 2019 Intel Corporation * * SPDX-License-Identifier: Apache-2.0 */ #include LOG_MODULE_REGISTER(mqtt_azure, LOG_LEVEL_DBG); #include #include #include #include #include #include #include #include #include "config.h" #include "test_certs.h" /* Buffers for MQTT client. */ static uint8_t rx_buffer[APP_MQTT_BUFFER_SIZE]; static uint8_t tx_buffer[APP_MQTT_BUFFER_SIZE]; /* The mqtt client struct */ static struct mqtt_client client_ctx; /* MQTT Broker details. */ static struct sockaddr_storage broker; #if defined(CONFIG_SOCKS) static struct sockaddr socks5_proxy; #endif /* Socket Poll */ static struct pollfd fds[1]; static int nfds; static bool mqtt_connected; static struct k_work_delayable pub_message; #if defined(CONFIG_NET_DHCPV4) static struct k_work_delayable check_network_conn; /* Network Management events */ #define L4_EVENT_MASK (NET_EVENT_L4_CONNECTED | NET_EVENT_L4_DISCONNECTED) static struct net_mgmt_event_callback l4_mgmt_cb; #endif #if defined(CONFIG_DNS_RESOLVER) static struct addrinfo hints; static struct addrinfo *haddr; #endif static K_SEM_DEFINE(mqtt_start, 0, 1); /* Application TLS configuration details */ #define TLS_SNI_HOSTNAME CONFIG_SAMPLE_CLOUD_AZURE_HOSTNAME #define APP_CA_CERT_TAG 1 static const sec_tag_t m_sec_tags[] = { APP_CA_CERT_TAG, }; static uint8_t devbound_topic[] = "devices/" MQTT_CLIENTID "/messages/devicebound/#"; static struct mqtt_topic subs_topic; static struct mqtt_subscription_list subs_list; static void mqtt_event_handler(struct mqtt_client *const client, const struct mqtt_evt *evt); static int tls_init(void) { int err; err = tls_credential_add(APP_CA_CERT_TAG, TLS_CREDENTIAL_CA_CERTIFICATE, ca_certificate, sizeof(ca_certificate)); if (err < 0) { LOG_ERR("Failed to register public certificate: %d", err); return err; } return err; } static void prepare_fds(struct mqtt_client *client) { if (client->transport.type == MQTT_TRANSPORT_SECURE) { fds[0].fd = client->transport.tls.sock; } fds[0].events = POLLIN; nfds = 1; } static void clear_fds(void) { nfds = 0; } static int wait(int timeout) { int rc = -EINVAL; if (nfds <= 0) { return rc; } rc = poll(fds, nfds, timeout); if (rc < 0) { LOG_ERR("poll error: %d", errno); return -errno; } return rc; } static void broker_init(void) { struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker; broker4->sin_family = AF_INET; broker4->sin_port = htons(SERVER_PORT); #if defined(CONFIG_DNS_RESOLVER) net_ipaddr_copy(&broker4->sin_addr, &net_sin(haddr->ai_addr)->sin_addr); #else inet_pton(AF_INET, SERVER_ADDR, &broker4->sin_addr); #endif #if defined(CONFIG_SOCKS) struct sockaddr_in *proxy4 = (struct sockaddr_in *)&socks5_proxy; proxy4->sin_family = AF_INET; proxy4->sin_port = htons(SOCKS5_PROXY_PORT); inet_pton(AF_INET, SOCKS5_PROXY_ADDR, &proxy4->sin_addr); #endif } static void client_init(struct mqtt_client *client) { static struct mqtt_utf8 password; static struct mqtt_utf8 username; struct mqtt_sec_config *tls_config; mqtt_client_init(client); broker_init(); /* MQTT client configuration */ client->broker = &broker; client->evt_cb = mqtt_event_handler; client->client_id.utf8 = (uint8_t *)MQTT_CLIENTID; client->client_id.size = strlen(MQTT_CLIENTID); password.utf8 = (uint8_t *)CONFIG_SAMPLE_CLOUD_AZURE_PASSWORD; password.size = strlen(CONFIG_SAMPLE_CLOUD_AZURE_PASSWORD); client->password = &password; username.utf8 = (uint8_t *)CONFIG_SAMPLE_CLOUD_AZURE_USERNAME; username.size = strlen(CONFIG_SAMPLE_CLOUD_AZURE_USERNAME); client->user_name = &username; client->protocol_version = MQTT_VERSION_3_1_1; /* MQTT buffers configuration */ client->rx_buf = rx_buffer; client->rx_buf_size = sizeof(rx_buffer); client->tx_buf = tx_buffer; client->tx_buf_size = sizeof(tx_buffer); /* MQTT transport configuration */ client->transport.type = MQTT_TRANSPORT_SECURE; tls_config = &client->transport.tls.config; tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED; tls_config->cipher_list = NULL; tls_config->sec_tag_list = m_sec_tags; tls_config->sec_tag_count = ARRAY_SIZE(m_sec_tags); tls_config->hostname = TLS_SNI_HOSTNAME; #if defined(CONFIG_SOCKS) mqtt_client_set_proxy(client, &socks5_proxy, socks5_proxy.sa_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)); #endif } static void mqtt_event_handler(struct mqtt_client *const client, const struct mqtt_evt *evt) { struct mqtt_puback_param puback; uint8_t data[33]; int len; int bytes_read; switch (evt->type) { case MQTT_EVT_SUBACK: LOG_INF("SUBACK packet id: %u", evt->param.suback.message_id); break; case MQTT_EVT_UNSUBACK: LOG_INF("UNSUBACK packet id: %u", evt->param.suback.message_id); break; case MQTT_EVT_CONNACK: if (evt->result) { LOG_ERR("MQTT connect failed %d", evt->result); break; } mqtt_connected = true; LOG_DBG("MQTT client connected!"); break; case MQTT_EVT_DISCONNECT: LOG_DBG("MQTT client disconnected %d", evt->result); mqtt_connected = false; clear_fds(); break; case MQTT_EVT_PUBACK: if (evt->result) { LOG_ERR("MQTT PUBACK error %d", evt->result); break; } LOG_DBG("PUBACK packet id: %u\n", evt->param.puback.message_id); break; case MQTT_EVT_PUBLISH: len = evt->param.publish.message.payload.len; LOG_INF("MQTT publish received %d, %d bytes", evt->result, len); LOG_INF(" id: %d, qos: %d", evt->param.publish.message_id, evt->param.publish.message.topic.qos); while (len) { bytes_read = mqtt_read_publish_payload(&client_ctx, data, len >= sizeof(data) - 1 ? sizeof(data) - 1 : len); if (bytes_read < 0 && bytes_read != -EAGAIN) { LOG_ERR("failure to read payload"); break; } data[bytes_read] = '\0'; LOG_INF(" payload: %s", data); len -= bytes_read; } puback.message_id = evt->param.publish.message_id; mqtt_publish_qos1_ack(&client_ctx, &puback); break; default: LOG_DBG("Unhandled MQTT event %d", evt->type); break; } } static void subscribe(struct mqtt_client *client) { int err; /* subscribe */ subs_topic.topic.utf8 = devbound_topic; subs_topic.topic.size = strlen(devbound_topic); subs_list.list = &subs_topic; subs_list.list_count = 1U; subs_list.message_id = 1U; err = mqtt_subscribe(client, &subs_list); if (err) { LOG_ERR("Failed on topic %s", devbound_topic); } } static int publish(struct mqtt_client *client, enum mqtt_qos qos) { char payload[] = "{id=123}"; char evt_topic[] = "devices/" MQTT_CLIENTID "/messages/events/"; uint8_t len = strlen(evt_topic); struct mqtt_publish_param param; param.message.topic.qos = qos; param.message.topic.topic.utf8 = (uint8_t *)evt_topic; param.message.topic.topic.size = len; param.message.payload.data = payload; param.message.payload.len = strlen(payload); param.message_id = sys_rand16_get(); param.dup_flag = 0U; param.retain_flag = 0U; return mqtt_publish(client, ¶m); } static void poll_mqtt(void) { int rc; while (mqtt_connected) { rc = wait(SYS_FOREVER_MS); if (rc > 0) { mqtt_input(&client_ctx); } } } /* Random time between 10 - 15 seconds * If you prefer to have this value more than CONFIG_MQTT_KEEPALIVE, * then keep the application connection live by calling mqtt_live() * in regular intervals. */ static uint8_t timeout_for_publish(void) { return (10 + sys_rand8_get() % 5); } static void publish_timeout(struct k_work *work) { int rc; if (!mqtt_connected) { return; } rc = publish(&client_ctx, MQTT_QOS_1_AT_LEAST_ONCE); if (rc) { LOG_ERR("mqtt_publish ERROR"); goto end; } LOG_DBG("mqtt_publish OK"); end: k_work_reschedule(&pub_message, K_SECONDS(timeout_for_publish())); } static int try_to_connect(struct mqtt_client *client) { uint8_t retries = 3U; int rc; LOG_DBG("attempting to connect..."); while (retries--) { client_init(client); rc = mqtt_connect(client); if (rc) { LOG_ERR("mqtt_connect failed %d", rc); continue; } prepare_fds(client); rc = wait(APP_SLEEP_MSECS); if (rc < 0) { mqtt_abort(client); return rc; } mqtt_input(client); if (mqtt_connected) { subscribe(client); k_work_reschedule(&pub_message, K_SECONDS(timeout_for_publish())); return 0; } mqtt_abort(client); wait(10 * MSEC_PER_SEC); } return -EINVAL; } #if defined(CONFIG_DNS_RESOLVER) static int get_mqtt_broker_addrinfo(void) { int retries = 3; int rc = -EINVAL; while (retries--) { hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = 0; rc = getaddrinfo(CONFIG_SAMPLE_CLOUD_AZURE_HOSTNAME, "8883", &hints, &haddr); if (rc == 0) { LOG_INF("DNS resolved for %s:%d", CONFIG_SAMPLE_CLOUD_AZURE_HOSTNAME, CONFIG_SAMPLE_CLOUD_AZURE_SERVER_PORT); return 0; } LOG_ERR("DNS not resolved for %s:%d, retrying", CONFIG_SAMPLE_CLOUD_AZURE_HOSTNAME, CONFIG_SAMPLE_CLOUD_AZURE_SERVER_PORT); } return rc; } #endif static void connect_to_cloud_and_publish(void) { int rc = -EINVAL; #if defined(CONFIG_NET_DHCPV4) while (true) { k_sem_take(&mqtt_start, K_FOREVER); #endif #if defined(CONFIG_DNS_RESOLVER) rc = get_mqtt_broker_addrinfo(); if (rc) { return; } #endif rc = try_to_connect(&client_ctx); if (rc) { return; } poll_mqtt(); #if defined(CONFIG_NET_DHCPV4) } #endif } /* DHCP tries to renew the address after interface is down and up. * If DHCPv4 address renewal is success, then it doesn't generate * any event. We have to monitor this way. * If DHCPv4 attempts exceeds maximum number, it will delete iface * address and attempts for new request. In this case we can rely * on IPV4_ADDR_ADD event. */ #if defined(CONFIG_NET_DHCPV4) static void check_network_connection(struct k_work *work) { struct net_if *iface; if (mqtt_connected) { return; } iface = net_if_get_default(); if (!iface) { goto end; } if (iface->config.dhcpv4.state == NET_DHCPV4_BOUND) { k_sem_give(&mqtt_start); return; } LOG_INF("waiting for DHCP to acquire addr"); end: k_work_reschedule(&check_network_conn, K_SECONDS(3)); } #endif #if defined(CONFIG_NET_DHCPV4) static void abort_mqtt_connection(void) { if (mqtt_connected) { mqtt_connected = false; mqtt_abort(&client_ctx); k_work_cancel_delayable(&pub_message); } } static void l4_event_handler(struct net_mgmt_event_callback *cb, uint64_t mgmt_event, struct net_if *iface) { if ((mgmt_event & L4_EVENT_MASK) != mgmt_event) { return; } if (mgmt_event == NET_EVENT_L4_CONNECTED) { /* Wait for DHCP to be back in BOUND state */ k_work_reschedule(&check_network_conn, K_SECONDS(3)); return; } if (mgmt_event == NET_EVENT_L4_DISCONNECTED) { abort_mqtt_connection(); k_work_cancel_delayable(&check_network_conn); return; } } #endif int main(void) { int rc; LOG_DBG("Waiting for network to setup..."); rc = tls_init(); if (rc) { return 0; } k_work_init_delayable(&pub_message, publish_timeout); #if defined(CONFIG_NET_DHCPV4) k_work_init_delayable(&check_network_conn, check_network_connection); net_mgmt_init_event_callback(&l4_mgmt_cb, l4_event_handler, L4_EVENT_MASK); net_mgmt_add_event_callback(&l4_mgmt_cb); #endif connect_to_cloud_and_publish(); return 0; }