Lines Matching refs:client
21 static void client_reset(struct mqtt_client *client) in client_reset() argument
23 MQTT_STATE_INIT(client); in client_reset()
25 client->internal.last_activity = 0U; in client_reset()
26 client->internal.rx_buf_datalen = 0U; in client_reset()
27 client->internal.remaining_payload = 0U; in client_reset()
31 static void tx_buf_init(struct mqtt_client *client, struct buf_ctx *buf) in tx_buf_init() argument
33 memset(client->tx_buf, 0, client->tx_buf_size); in tx_buf_init()
34 buf->cur = client->tx_buf; in tx_buf_init()
35 buf->end = client->tx_buf + client->tx_buf_size; in tx_buf_init()
38 void event_notify(struct mqtt_client *client, const struct mqtt_evt *evt) in event_notify() argument
40 if (client->evt_cb != NULL) { in event_notify()
41 mqtt_mutex_unlock(client); in event_notify()
43 client->evt_cb(client, evt); in event_notify()
45 mqtt_mutex_lock(client); in event_notify()
49 void mqtt_client_disconnect(struct mqtt_client *client, int result, bool notify) in mqtt_client_disconnect() argument
53 err_code = mqtt_transport_disconnect(client); in mqtt_client_disconnect()
59 client_reset(client); in mqtt_client_disconnect()
68 event_notify(client, &evt); in mqtt_client_disconnect()
72 static int client_connect(struct mqtt_client *client) in client_connect() argument
77 err_code = mqtt_transport_connect(client); in client_connect()
82 tx_buf_init(client, &packet); in client_connect()
83 MQTT_SET_STATE(client, MQTT_STATE_TCP_CONNECTED); in client_connect()
85 err_code = connect_request_encode(client, &packet); in client_connect()
91 err_code = mqtt_transport_write(client, packet.cur, in client_connect()
97 client->internal.last_activity = mqtt_sys_tick_in_ms_get(); in client_connect()
100 client->unacked_ping = 0; in client_connect()
107 mqtt_client_disconnect(client, err_code, false); in client_connect()
111 static int client_write(struct mqtt_client *client, const uint8_t *data,
115 static void disconnect_5_0_notify(struct mqtt_client *client, int err) in disconnect_5_0_notify() argument
123 if (client->internal.disconnect_reason != MQTT_DISCONNECT_NORMAL) { in disconnect_5_0_notify()
124 param.reason_code = client->internal.disconnect_reason; in disconnect_5_0_notify()
146 tx_buf_init(client, &packet); in disconnect_5_0_notify()
148 if (disconnect_encode(client, ¶m, &packet) < 0) { in disconnect_5_0_notify()
152 (void)client_write(client, packet.cur, packet.end - packet.cur); in disconnect_5_0_notify()
155 static void disconnect_5_0_notify(struct mqtt_client *client, int err) in disconnect_5_0_notify() argument
157 ARG_UNUSED(client); in disconnect_5_0_notify()
163 static int client_read(struct mqtt_client *client) in client_read() argument
167 if (client->internal.remaining_payload > 0) { in client_read()
171 err_code = mqtt_handle_rx(client); in client_read()
173 if (mqtt_is_version_5_0(client)) { in client_read()
175 disconnect_5_0_notify(client, -err_code); in client_read()
178 mqtt_client_disconnect(client, err_code, true); in client_read()
184 static int client_write(struct mqtt_client *client, const uint8_t *data, in client_write() argument
189 NET_DBG("[%p]: Transport writing %d bytes.", client, datalen); in client_write()
191 err_code = mqtt_transport_write(client, data, datalen); in client_write()
195 mqtt_client_disconnect(client, err_code, true); in client_write()
199 NET_DBG("[%p]: Transport write complete.", client); in client_write()
200 client->internal.last_activity = mqtt_sys_tick_in_ms_get(); in client_write()
205 static int client_write_msg(struct mqtt_client *client, in client_write_msg() argument
210 NET_DBG("[%p]: Transport writing message.", client); in client_write_msg()
212 err_code = mqtt_transport_write_msg(client, message); in client_write_msg()
216 mqtt_client_disconnect(client, err_code, true); in client_write_msg()
220 NET_DBG("[%p]: Transport write complete.", client); in client_write_msg()
221 client->internal.last_activity = mqtt_sys_tick_in_ms_get(); in client_write_msg()
226 void mqtt_client_init(struct mqtt_client *client) in mqtt_client_init() argument
228 NULL_PARAM_CHECK_VOID(client); in mqtt_client_init()
230 memset(client, 0, sizeof(*client)); in mqtt_client_init()
232 MQTT_STATE_INIT(client); in mqtt_client_init()
233 mqtt_mutex_init(client); in mqtt_client_init()
235 client->protocol_version = IS_ENABLED(CONFIG_MQTT_VERSION_5_0) ? in mqtt_client_init()
237 client->clean_session = MQTT_CLEAN_SESSION; in mqtt_client_init()
238 client->keepalive = MQTT_KEEPALIVE; in mqtt_client_init()
242 int mqtt_client_set_proxy(struct mqtt_client *client, in mqtt_client_set_proxy() argument
247 if (!client || !proxy_addr) { in mqtt_client_set_proxy()
251 client->transport.proxy.addrlen = addrlen; in mqtt_client_set_proxy()
252 memcpy(&client->transport.proxy.addr, proxy_addr, addrlen); in mqtt_client_set_proxy()
261 int mqtt_connect(struct mqtt_client *client) in mqtt_connect() argument
265 NULL_PARAM_CHECK(client); in mqtt_connect()
266 NULL_PARAM_CHECK(client->client_id.utf8); in mqtt_connect()
268 mqtt_mutex_lock(client); in mqtt_connect()
270 if ((client->tx_buf == NULL) || (client->rx_buf == NULL)) { in mqtt_connect()
275 err_code = client_connect(client); in mqtt_connect()
279 client_reset(client); in mqtt_connect()
282 mqtt_mutex_unlock(client); in mqtt_connect()
287 static int verify_tx_state(const struct mqtt_client *client) in verify_tx_state() argument
289 if (!MQTT_HAS_STATE(client, MQTT_STATE_CONNECTED)) { in verify_tx_state()
296 int mqtt_publish(struct mqtt_client *client, in mqtt_publish() argument
304 NULL_PARAM_CHECK(client); in mqtt_publish()
308 "Data size 0x%08x", client, client->internal.state, in mqtt_publish()
312 mqtt_mutex_lock(client); in mqtt_publish()
314 tx_buf_init(client, &packet); in mqtt_publish()
316 err_code = verify_tx_state(client); in mqtt_publish()
321 err_code = publish_encode(client, param, &packet); in mqtt_publish()
336 err_code = client_write_msg(client, &msg); in mqtt_publish()
340 client, client->internal.state, err_code); in mqtt_publish()
342 mqtt_mutex_unlock(client); in mqtt_publish()
347 int mqtt_publish_qos1_ack(struct mqtt_client *client, in mqtt_publish_qos1_ack() argument
353 NULL_PARAM_CHECK(client); in mqtt_publish_qos1_ack()
357 client, client->internal.state, param->message_id); in mqtt_publish_qos1_ack()
359 mqtt_mutex_lock(client); in mqtt_publish_qos1_ack()
361 tx_buf_init(client, &packet); in mqtt_publish_qos1_ack()
363 err_code = verify_tx_state(client); in mqtt_publish_qos1_ack()
368 err_code = publish_ack_encode(client, param, &packet); in mqtt_publish_qos1_ack()
373 err_code = client_write(client, packet.cur, packet.end - packet.cur); in mqtt_publish_qos1_ack()
377 client, client->internal.state, err_code); in mqtt_publish_qos1_ack()
379 mqtt_mutex_unlock(client); in mqtt_publish_qos1_ack()
384 int mqtt_publish_qos2_receive(struct mqtt_client *client, in mqtt_publish_qos2_receive() argument
390 NULL_PARAM_CHECK(client); in mqtt_publish_qos2_receive()
394 client, client->internal.state, param->message_id); in mqtt_publish_qos2_receive()
396 mqtt_mutex_lock(client); in mqtt_publish_qos2_receive()
398 tx_buf_init(client, &packet); in mqtt_publish_qos2_receive()
400 err_code = verify_tx_state(client); in mqtt_publish_qos2_receive()
405 err_code = publish_receive_encode(client, param, &packet); in mqtt_publish_qos2_receive()
410 err_code = client_write(client, packet.cur, packet.end - packet.cur); in mqtt_publish_qos2_receive()
414 client, client->internal.state, err_code); in mqtt_publish_qos2_receive()
416 mqtt_mutex_unlock(client); in mqtt_publish_qos2_receive()
421 int mqtt_publish_qos2_release(struct mqtt_client *client, in mqtt_publish_qos2_release() argument
427 NULL_PARAM_CHECK(client); in mqtt_publish_qos2_release()
431 client, client->internal.state, param->message_id); in mqtt_publish_qos2_release()
433 mqtt_mutex_lock(client); in mqtt_publish_qos2_release()
435 tx_buf_init(client, &packet); in mqtt_publish_qos2_release()
437 err_code = verify_tx_state(client); in mqtt_publish_qos2_release()
442 err_code = publish_release_encode(client, param, &packet); in mqtt_publish_qos2_release()
447 err_code = client_write(client, packet.cur, packet.end - packet.cur); in mqtt_publish_qos2_release()
451 client, client->internal.state, err_code); in mqtt_publish_qos2_release()
453 mqtt_mutex_unlock(client); in mqtt_publish_qos2_release()
458 int mqtt_publish_qos2_complete(struct mqtt_client *client, in mqtt_publish_qos2_complete() argument
464 NULL_PARAM_CHECK(client); in mqtt_publish_qos2_complete()
468 client, client->internal.state, param->message_id); in mqtt_publish_qos2_complete()
470 mqtt_mutex_lock(client); in mqtt_publish_qos2_complete()
472 tx_buf_init(client, &packet); in mqtt_publish_qos2_complete()
474 err_code = verify_tx_state(client); in mqtt_publish_qos2_complete()
479 err_code = publish_complete_encode(client, param, &packet); in mqtt_publish_qos2_complete()
484 err_code = client_write(client, packet.cur, packet.end - packet.cur); in mqtt_publish_qos2_complete()
491 client, client->internal.state, err_code); in mqtt_publish_qos2_complete()
493 mqtt_mutex_unlock(client); in mqtt_publish_qos2_complete()
498 int mqtt_disconnect(struct mqtt_client *client, in mqtt_disconnect() argument
504 NULL_PARAM_CHECK(client); in mqtt_disconnect()
506 mqtt_mutex_lock(client); in mqtt_disconnect()
508 tx_buf_init(client, &packet); in mqtt_disconnect()
510 err_code = verify_tx_state(client); in mqtt_disconnect()
515 err_code = disconnect_encode(client, param, &packet); in mqtt_disconnect()
520 err_code = client_write(client, packet.cur, packet.end - packet.cur); in mqtt_disconnect()
525 mqtt_client_disconnect(client, 0, true); in mqtt_disconnect()
528 mqtt_mutex_unlock(client); in mqtt_disconnect()
533 int mqtt_subscribe(struct mqtt_client *client, in mqtt_subscribe() argument
539 NULL_PARAM_CHECK(client); in mqtt_subscribe()
543 "topic count 0x%04x", client, client->internal.state, in mqtt_subscribe()
546 mqtt_mutex_lock(client); in mqtt_subscribe()
548 tx_buf_init(client, &packet); in mqtt_subscribe()
550 err_code = verify_tx_state(client); in mqtt_subscribe()
555 err_code = subscribe_encode(client, param, &packet); in mqtt_subscribe()
560 err_code = client_write(client, packet.cur, packet.end - packet.cur); in mqtt_subscribe()
564 client, client->internal.state, err_code); in mqtt_subscribe()
566 mqtt_mutex_unlock(client); in mqtt_subscribe()
571 int mqtt_unsubscribe(struct mqtt_client *client, in mqtt_unsubscribe() argument
577 NULL_PARAM_CHECK(client); in mqtt_unsubscribe()
580 mqtt_mutex_lock(client); in mqtt_unsubscribe()
582 tx_buf_init(client, &packet); in mqtt_unsubscribe()
584 err_code = verify_tx_state(client); in mqtt_unsubscribe()
589 err_code = unsubscribe_encode(client, param, &packet); in mqtt_unsubscribe()
594 err_code = client_write(client, packet.cur, packet.end - packet.cur); in mqtt_unsubscribe()
597 mqtt_mutex_unlock(client); in mqtt_unsubscribe()
602 int mqtt_ping(struct mqtt_client *client) in mqtt_ping() argument
607 NULL_PARAM_CHECK(client); in mqtt_ping()
609 mqtt_mutex_lock(client); in mqtt_ping()
611 tx_buf_init(client, &packet); in mqtt_ping()
613 err_code = verify_tx_state(client); in mqtt_ping()
623 err_code = client_write(client, packet.cur, packet.end - packet.cur); in mqtt_ping()
625 if (client->unacked_ping >= INT8_MAX) { in mqtt_ping()
628 client->unacked_ping++; in mqtt_ping()
632 mqtt_mutex_unlock(client); in mqtt_ping()
638 static int verify_auth_state(const struct mqtt_client *client) in verify_auth_state() argument
643 if (MQTT_HAS_STATE(client, MQTT_STATE_TCP_CONNECTED) && in verify_auth_state()
644 !MQTT_HAS_STATE(client, MQTT_STATE_CONNECTED)) { in verify_auth_state()
652 int mqtt_auth(struct mqtt_client *client, const struct mqtt_auth_param *param) in mqtt_auth() argument
657 NULL_PARAM_CHECK(client); in mqtt_auth()
660 mqtt_mutex_lock(client); in mqtt_auth()
662 if (!mqtt_is_version_5_0(client)) { in mqtt_auth()
668 tx_buf_init(client, &packet); in mqtt_auth()
670 err_code = verify_auth_state(client); in mqtt_auth()
680 err_code = client_write(client, packet.cur, packet.end - packet.cur); in mqtt_auth()
683 mqtt_mutex_unlock(client); in mqtt_auth()
689 int mqtt_abort(struct mqtt_client *client) in mqtt_abort() argument
691 NULL_PARAM_CHECK(client); in mqtt_abort()
693 mqtt_mutex_lock(client); in mqtt_abort()
695 if (client->internal.state != MQTT_STATE_IDLE) { in mqtt_abort()
696 mqtt_client_disconnect(client, -ECONNABORTED, true); in mqtt_abort()
699 mqtt_mutex_unlock(client); in mqtt_abort()
704 int mqtt_live(struct mqtt_client *client) in mqtt_live() argument
710 NULL_PARAM_CHECK(client); in mqtt_live()
712 mqtt_mutex_lock(client); in mqtt_live()
715 client->internal.last_activity); in mqtt_live()
716 if ((client->keepalive > 0) && in mqtt_live()
717 (elapsed_time >= (client->keepalive * 1000))) { in mqtt_live()
718 err_code = mqtt_ping(client); in mqtt_live()
722 mqtt_mutex_unlock(client); in mqtt_live()
731 int mqtt_keepalive_time_left(const struct mqtt_client *client) in mqtt_keepalive_time_left() argument
734 client->internal.last_activity); in mqtt_keepalive_time_left()
735 uint32_t keepalive_ms = 1000U * client->keepalive; in mqtt_keepalive_time_left()
737 if (client->keepalive == 0) { in mqtt_keepalive_time_left()
749 int mqtt_input(struct mqtt_client *client) in mqtt_input() argument
753 NULL_PARAM_CHECK(client); in mqtt_input()
755 mqtt_mutex_lock(client); in mqtt_input()
757 NET_DBG("state:0x%08x", client->internal.state); in mqtt_input()
759 if (MQTT_HAS_STATE(client, MQTT_STATE_TCP_CONNECTED)) { in mqtt_input()
760 err_code = client_read(client); in mqtt_input()
765 mqtt_mutex_unlock(client); in mqtt_input()
770 static int read_publish_payload(struct mqtt_client *client, void *buffer, in read_publish_payload() argument
775 NULL_PARAM_CHECK(client); in read_publish_payload()
777 mqtt_mutex_lock(client); in read_publish_payload()
779 if (client->internal.remaining_payload == 0U) { in read_publish_payload()
784 if (client->internal.remaining_payload < length) { in read_publish_payload()
785 length = client->internal.remaining_payload; in read_publish_payload()
788 ret = mqtt_transport_read(client, buffer, length, shall_block); in read_publish_payload()
798 mqtt_client_disconnect(client, ret, true); in read_publish_payload()
802 client->internal.remaining_payload -= ret; in read_publish_payload()
805 mqtt_mutex_unlock(client); in read_publish_payload()
810 int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer, in mqtt_read_publish_payload() argument
813 return read_publish_payload(client, buffer, length, false); in mqtt_read_publish_payload()
816 int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer, in mqtt_read_publish_payload_blocking() argument
819 return read_publish_payload(client, buffer, length, true); in mqtt_read_publish_payload_blocking()
822 int mqtt_readall_publish_payload(struct mqtt_client *client, uint8_t *buffer, in mqtt_readall_publish_payload() argument
828 int ret = mqtt_read_publish_payload_blocking(client, buffer, in mqtt_readall_publish_payload()