1 #include "iotx_dm_internal.h"
2
3 static dm_client_ctx_t g_dm_client_ctx = { 0 };
4
dm_client_get_ctx(void)5 static dm_client_ctx_t *dm_client_get_ctx(void)
6 {
7 return &g_dm_client_ctx;
8 }
9
dm_client_open(void)10 int dm_client_open(void)
11 {
12 int res = 0;
13 dm_client_ctx_t *ctx = dm_client_get_ctx();
14 iotx_cm_init_param_t cm_param;
15
16 memset(ctx, 0, sizeof(dm_client_ctx_t));
17 memset(&cm_param, 0, sizeof(iotx_cm_init_param_t));
18
19 cm_param.request_timeout_ms = IOTX_DM_CLIENT_REQUEST_TIMEOUT_MS;
20 cm_param.keepalive_interval_ms = IOTX_DM_CLIENT_KEEPALIVE_INTERVAL_MS;
21 cm_param.write_buf_size = CONFIG_MQTT_TX_MAXLEN;
22 cm_param.read_buf_size = CONFIG_MQTT_RX_MAXLEN;
23 #if defined(COAP_COMM_ENABLED) && !defined(MQTT_COMM_ENABLED)
24 cm_param.protocol_type = IOTX_CM_PROTOCOL_TYPE_COAP;
25 #else
26 cm_param.protocol_type = IOTX_CM_PROTOCOL_TYPE_MQTT;
27 #endif
28 cm_param.handle_event = dm_client_event_handle;
29
30 res = iotx_cm_open(&cm_param);
31
32 if (res < SUCCESS_RETURN) {
33 return res;
34 }
35 ctx->fd = res;
36
37 dm_log_info("CM Fd: %d", ctx->fd);
38
39 return SUCCESS_RETURN;
40 }
41
dm_client_connect(int timeout_ms)42 int dm_client_connect(int timeout_ms)
43 {
44 int res = 0;
45 dm_client_ctx_t *ctx = dm_client_get_ctx();
46
47 res = iotx_cm_connect(ctx->fd, timeout_ms);
48 if (res < SUCCESS_RETURN) {
49 return res;
50 }
51
52 return SUCCESS_RETURN;
53 }
54
dm_client_close(void)55 int dm_client_close(void)
56 {
57 dm_client_ctx_t *ctx = dm_client_get_ctx();
58
59 return iotx_cm_close(ctx->fd);
60 }
61
dm_client_subscribe(char * uri,iotx_cm_data_handle_cb callback,void * context)62 int dm_client_subscribe(char *uri, iotx_cm_data_handle_cb callback,
63 void *context)
64 {
65 int res = 0;
66 uint8_t local_sub = 0;
67 dm_client_ctx_t *ctx = dm_client_get_ctx();
68 iotx_cm_ext_params_t sub_params;
69
70 memset(&sub_params, 0, sizeof(iotx_cm_ext_params_t));
71 if (context != NULL) {
72 local_sub = *((uint8_t *)context);
73 }
74
75 if (local_sub == 1) {
76 sub_params.ack_type = IOTX_CM_MESSAGE_SUB_LOCAL;
77 sub_params.sync_mode = IOTX_CM_ASYNC;
78 } else {
79 sub_params.ack_type = IOTX_CM_MESSAGE_NO_ACK;
80 sub_params.sync_mode = IOTX_CM_SYNC;
81 }
82
83 sub_params.sync_timeout = IOTX_DM_CLIENT_SUB_TIMEOUT_MS;
84 sub_params.ack_cb = NULL;
85
86 res = iotx_cm_sub(ctx->fd, &sub_params, (const char *)uri, callback, NULL);
87 dm_log_info("Subscribe Result: %d", res);
88
89 if (res < SUCCESS_RETURN) {
90 return res;
91 }
92
93 return SUCCESS_RETURN;
94 }
95
dm_client_unsubscribe(char * uri)96 int dm_client_unsubscribe(char *uri)
97 {
98 int res = 0;
99 dm_client_ctx_t *ctx = dm_client_get_ctx();
100
101 res = iotx_cm_unsub(ctx->fd, uri);
102
103 dm_log_info("Unsubscribe Result: %d", res);
104
105 return res;
106 }
107
dm_client_publish(char * uri,unsigned char * payload,int payload_len,iotx_cm_data_handle_cb callback)108 int dm_client_publish(char *uri, unsigned char *payload, int payload_len,
109 iotx_cm_data_handle_cb callback)
110 {
111 int res = 0;
112 char *pub_uri = NULL;
113 dm_client_ctx_t *ctx = dm_client_get_ctx();
114 iotx_cm_ext_params_t pub_param;
115
116 memset(&pub_param, 0, sizeof(iotx_cm_ext_params_t));
117 pub_param.ack_type = IOTX_CM_MESSAGE_NO_ACK;
118 pub_param.sync_mode = IOTX_CM_ASYNC;
119 pub_param.sync_timeout = 0;
120 pub_param.ack_cb = NULL;
121
122 #if defined(COAP_COMM_ENABLED) && !defined(MQTT_COMM_ENABLED)
123 pub_param.ack_cb = callback;
124 res = dm_utils_uri_add_prefix("/topic", uri, &pub_uri);
125 if (res < SUCCESS_RETURN) {
126 return FAIL_RETURN;
127 }
128 #else
129 pub_uri = uri;
130 #endif
131
132 res = iotx_cm_pub(ctx->fd, &pub_param, (const char *)pub_uri,
133 (const char *)payload, (unsigned int)payload_len);
134 dm_log_info("Publish Result: %d", res);
135
136 #if defined(COAP_COMM_ENABLED) && !defined(MQTT_COMM_ENABLED)
137 DM_free(pub_uri);
138 #endif
139
140 return res;
141 }
142
dm_client_yield(unsigned int timeout)143 int dm_client_yield(unsigned int timeout)
144 {
145 dm_client_ctx_t *ctx = dm_client_get_ctx();
146
147 return iotx_cm_yield(ctx->fd, timeout);
148 }
149
dm_client_user_sub_request(int fd,const char * topic,const char * payload,unsigned int payload_len,void * context)150 void dm_client_user_sub_request(int fd, const char *topic, const char *payload,
151 unsigned int payload_len, void *context)
152 {
153 dm_msg_source_t source;
154
155 memset(&source, 0, sizeof(dm_msg_source_t));
156
157 source.uri = topic;
158 source.payload = (unsigned char *)payload;
159 source.payload_len = payload_len;
160 source.context = NULL;
161
162 dm_msg_proc_thing_model_user_sub(&source);
163 }
164