1 #include "iotx_dm_internal.h"
2 
3 static dm_client_ctx_t g_dm_client_ctx = { 0 };
4 
dm_client_get_ctx(void)5 static dm_client_ctx_t *dm_client_get_ctx(void)
6 {
7     return &g_dm_client_ctx;
8 }
9 
dm_client_open(void)10 int dm_client_open(void)
11 {
12     int res = 0;
13     dm_client_ctx_t *ctx = dm_client_get_ctx();
14     iotx_cm_init_param_t cm_param;
15 
16     memset(ctx, 0, sizeof(dm_client_ctx_t));
17     memset(&cm_param, 0, sizeof(iotx_cm_init_param_t));
18 
19     cm_param.request_timeout_ms = IOTX_DM_CLIENT_REQUEST_TIMEOUT_MS;
20     cm_param.keepalive_interval_ms = IOTX_DM_CLIENT_KEEPALIVE_INTERVAL_MS;
21     cm_param.write_buf_size = CONFIG_MQTT_TX_MAXLEN;
22     cm_param.read_buf_size = CONFIG_MQTT_RX_MAXLEN;
23 #if defined(COAP_COMM_ENABLED) && !defined(MQTT_COMM_ENABLED)
24     cm_param.protocol_type = IOTX_CM_PROTOCOL_TYPE_COAP;
25 #else
26     cm_param.protocol_type = IOTX_CM_PROTOCOL_TYPE_MQTT;
27 #endif
28     cm_param.handle_event = dm_client_event_handle;
29 
30     res = iotx_cm_open(&cm_param);
31 
32     if (res < SUCCESS_RETURN) {
33         return res;
34     }
35     ctx->fd = res;
36 
37     dm_log_info("CM Fd: %d", ctx->fd);
38 
39     return SUCCESS_RETURN;
40 }
41 
dm_client_connect(int timeout_ms)42 int dm_client_connect(int timeout_ms)
43 {
44     int res = 0;
45     dm_client_ctx_t *ctx = dm_client_get_ctx();
46 
47     res = iotx_cm_connect(ctx->fd, timeout_ms);
48     if (res < SUCCESS_RETURN) {
49         return res;
50     }
51 
52     return SUCCESS_RETURN;
53 }
54 
dm_client_close(void)55 int dm_client_close(void)
56 {
57     dm_client_ctx_t *ctx = dm_client_get_ctx();
58 
59     return iotx_cm_close(ctx->fd);
60 }
61 
dm_client_subscribe(char * uri,iotx_cm_data_handle_cb callback,void * context)62 int dm_client_subscribe(char *uri, iotx_cm_data_handle_cb callback,
63                         void *context)
64 {
65     int res = 0;
66     uint8_t local_sub = 0;
67     dm_client_ctx_t *ctx = dm_client_get_ctx();
68     iotx_cm_ext_params_t sub_params;
69 
70     memset(&sub_params, 0, sizeof(iotx_cm_ext_params_t));
71     if (context != NULL) {
72         local_sub = *((uint8_t *)context);
73     }
74 
75     if (local_sub == 1) {
76         sub_params.ack_type = IOTX_CM_MESSAGE_SUB_LOCAL;
77         sub_params.sync_mode = IOTX_CM_ASYNC;
78     } else {
79         sub_params.ack_type = IOTX_CM_MESSAGE_NO_ACK;
80         sub_params.sync_mode = IOTX_CM_SYNC;
81     }
82 
83     sub_params.sync_timeout = IOTX_DM_CLIENT_SUB_TIMEOUT_MS;
84     sub_params.ack_cb = NULL;
85 
86     res = iotx_cm_sub(ctx->fd, &sub_params, (const char *)uri, callback, NULL);
87     dm_log_info("Subscribe Result: %d", res);
88 
89     if (res < SUCCESS_RETURN) {
90         return res;
91     }
92 
93     return SUCCESS_RETURN;
94 }
95 
dm_client_unsubscribe(char * uri)96 int dm_client_unsubscribe(char *uri)
97 {
98     int res = 0;
99     dm_client_ctx_t *ctx = dm_client_get_ctx();
100 
101     res = iotx_cm_unsub(ctx->fd, uri);
102 
103     dm_log_info("Unsubscribe Result: %d", res);
104 
105     return res;
106 }
107 
dm_client_publish(char * uri,unsigned char * payload,int payload_len,iotx_cm_data_handle_cb callback)108 int dm_client_publish(char *uri, unsigned char *payload, int payload_len,
109                       iotx_cm_data_handle_cb callback)
110 {
111     int res = 0;
112     char *pub_uri = NULL;
113     dm_client_ctx_t *ctx = dm_client_get_ctx();
114     iotx_cm_ext_params_t pub_param;
115 
116     memset(&pub_param, 0, sizeof(iotx_cm_ext_params_t));
117     pub_param.ack_type = IOTX_CM_MESSAGE_NO_ACK;
118     pub_param.sync_mode = IOTX_CM_ASYNC;
119     pub_param.sync_timeout = 0;
120     pub_param.ack_cb = NULL;
121 
122 #if defined(COAP_COMM_ENABLED) && !defined(MQTT_COMM_ENABLED)
123     pub_param.ack_cb = callback;
124     res = dm_utils_uri_add_prefix("/topic", uri, &pub_uri);
125     if (res < SUCCESS_RETURN) {
126         return FAIL_RETURN;
127     }
128 #else
129     pub_uri = uri;
130 #endif
131 
132     res = iotx_cm_pub(ctx->fd, &pub_param, (const char *)pub_uri,
133                       (const char *)payload, (unsigned int)payload_len);
134     dm_log_info("Publish Result: %d", res);
135 
136 #if defined(COAP_COMM_ENABLED) && !defined(MQTT_COMM_ENABLED)
137     DM_free(pub_uri);
138 #endif
139 
140     return res;
141 }
142 
dm_client_yield(unsigned int timeout)143 int dm_client_yield(unsigned int timeout)
144 {
145     dm_client_ctx_t *ctx = dm_client_get_ctx();
146 
147     return iotx_cm_yield(ctx->fd, timeout);
148 }
149 
dm_client_user_sub_request(int fd,const char * topic,const char * payload,unsigned int payload_len,void * context)150 void dm_client_user_sub_request(int fd, const char *topic, const char *payload,
151                                 unsigned int payload_len, void *context)
152 {
153     dm_msg_source_t source;
154 
155     memset(&source, 0, sizeof(dm_msg_source_t));
156 
157     source.uri = topic;
158     source.payload = (unsigned char *)payload;
159     source.payload_len = payload_len;
160     source.context = NULL;
161 
162     dm_msg_proc_thing_model_user_sub(&source);
163 }
164