1 /*
2  * Copyright (C) 2016 FUJITSU LIMITED
3  * Author: Yang Hongyang <hongyang.yang@easystack.cn>
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU Lesser General Public License as published
7  * by the Free Software Foundation; version 2.1 only. with the special
8  * exception on linking described in file LICENSE.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU Lesser General Public License for more details.
14  */
15 
16 #include "libxl_osdeps.h" /* must come before any other headers */
17 
18 #include "libxl_internal.h"
19 
20 #include <netlink/netlink.h>
21 #include <arpa/inet.h>
22 #include <sys/socket.h>
23 #include <netinet/in.h>
24 
25 /* Consistent with the new COLO netlink channel in kernel side */
26 #define NETLINK_COLO 28
27 #define COLO_DEFAULT_WAIT_TIME 500000
28 
29 enum colo_netlink_op {
30     COLO_QUERY_CHECKPOINT = (NLMSG_MIN_TYPE + 1),
31     COLO_CHECKPOINT,
32     COLO_FAILOVER,
33     COLO_PROXY_INIT,
34     COLO_PROXY_RESET, /* UNUSED, will be used for continuous FT */
35 };
36 
37 /* ========= colo-proxy: helper functions ========== */
38 
colo_proxy_send(libxl__colo_proxy_state * cps,uint8_t * buff,uint64_t size,int type)39 static int colo_proxy_send(libxl__colo_proxy_state *cps, uint8_t *buff,
40                            uint64_t size, int type)
41 {
42     struct sockaddr_nl sa;
43     struct nlmsghdr msg;
44     struct iovec iov;
45     struct msghdr mh;
46     int ret;
47 
48     STATE_AO_GC(cps->ao);
49 
50     memset(&sa, 0, sizeof(sa));
51     sa.nl_family = AF_NETLINK;
52     sa.nl_pid = 0;
53     sa.nl_groups = 0;
54 
55     msg.nlmsg_len = NLMSG_SPACE(0);
56     msg.nlmsg_flags = NLM_F_REQUEST;
57     if (type == COLO_PROXY_INIT)
58         msg.nlmsg_flags |= NLM_F_ACK;
59     msg.nlmsg_seq = 0;
60     msg.nlmsg_pid = cps->index;
61     msg.nlmsg_type = type;
62 
63     iov.iov_base = &msg;
64     iov.iov_len = msg.nlmsg_len;
65 
66     mh.msg_name = &sa;
67     mh.msg_namelen = sizeof(sa);
68     mh.msg_iov = &iov;
69     mh.msg_iovlen = 1;
70     mh.msg_control = NULL;
71     mh.msg_controllen = 0;
72     mh.msg_flags = 0;
73 
74     ret = sendmsg(cps->sock_fd, &mh, 0);
75     if (ret <= 0) {
76         LOGD(ERROR, ao->domid, "can't send msg to kernel by netlink: %s",
77             strerror(errno));
78     }
79 
80     return ret;
81 }
82 
colo_userspace_proxy_send(libxl__colo_proxy_state * cps,uint8_t * buff,uint32_t size)83 static int colo_userspace_proxy_send(libxl__colo_proxy_state *cps,
84                                      uint8_t *buff,
85                                      uint32_t size)
86 {
87     int ret = 0;
88     uint32_t len = 0;
89 
90     len = htonl(size);
91     ret = send(cps->sock_fd, (uint8_t *)&len, sizeof(len), 0);
92     if (ret != sizeof(len)) {
93         goto err;
94     }
95 
96     ret = send(cps->sock_fd, (uint8_t *)buff, size, 0);
97     if (ret != size) {
98         goto err;
99     }
100 
101 err:
102     return ret;
103 }
104 
colo_userspace_proxy_recv(libxl__colo_proxy_state * cps,char * buff,unsigned int timeout_us)105 static int colo_userspace_proxy_recv(libxl__colo_proxy_state *cps,
106                                      char *buff,
107                                      unsigned int timeout_us)
108 {
109     struct timeval tv;
110     int ret;
111     uint32_t len = 0;
112     uint32_t size = 0;
113 
114     STATE_AO_GC(cps->ao);
115 
116     if (timeout_us) {
117         tv.tv_sec = timeout_us / 1000000;
118         tv.tv_usec = timeout_us % 1000000;
119         ret = setsockopt(cps->sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv,
120                          sizeof(tv));
121         if (ret < 0) {
122             LOGD(ERROR, ao->domid,
123                  "colo_userspace_proxy_recv setsockopt error: %s",
124                  strerror(errno));
125         }
126     }
127 
128     ret = recv(cps->sock_fd, (uint8_t *)&len, sizeof(len), 0);
129     if (ret < 0) {
130         goto err;
131     }
132 
133     size = ntohl(len);
134     ret = recv(cps->sock_fd, buff, size, 0);
135 
136 err:
137     return ret;
138 }
139 
140 /* error: return -1, otherwise return 0 */
colo_proxy_recv(libxl__colo_proxy_state * cps,uint8_t ** buff,unsigned int timeout_us)141 static int64_t colo_proxy_recv(libxl__colo_proxy_state *cps, uint8_t **buff,
142                                unsigned int timeout_us)
143 {
144     struct sockaddr_nl sa;
145     struct iovec iov;
146     struct msghdr mh = {
147         .msg_name = &sa,
148         .msg_namelen = sizeof(sa),
149         .msg_iov = &iov,
150         .msg_iovlen = 1,
151     };
152     struct timeval tv;
153     uint32_t size = 16384;
154     int64_t len = 0;
155     int ret;
156 
157     STATE_AO_GC(cps->ao);
158     uint8_t *tmp = libxl__malloc(NOGC, size);
159 
160     if (timeout_us) {
161         tv.tv_sec = timeout_us / 1000000;
162         tv.tv_usec = timeout_us % 1000000;
163         setsockopt(cps->sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
164     }
165 
166     iov.iov_base = tmp;
167     iov.iov_len = size;
168 next:
169     ret = recvmsg(cps->sock_fd, &mh, 0);
170     if (ret <= 0) {
171         if (errno != EAGAIN && errno != EWOULDBLOCK)
172             LOGED(ERROR, ao->domid, "can't recv msg from kernel by netlink");
173         goto err;
174     }
175 
176     len += ret;
177     if (mh.msg_flags & MSG_TRUNC) {
178         size += 16384;
179         tmp = libxl__realloc(NOGC, tmp, size);
180         iov.iov_base = tmp + len;
181         iov.iov_len = size - len;
182         goto next;
183     }
184 
185     *buff = tmp;
186     ret = len;
187     goto out;
188 
189 err:
190     free(tmp);
191     *buff = NULL;
192 
193 out:
194     if (timeout_us) {
195         tv.tv_sec = 0;
196         tv.tv_usec = 0;
197         setsockopt(cps->sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
198     }
199     return ret;
200 }
201 
202 /* ========= colo-proxy: setup and teardown ========== */
203 
colo_proxy_setup(libxl__colo_proxy_state * cps)204 int colo_proxy_setup(libxl__colo_proxy_state *cps)
205 {
206     int skfd = 0;
207     struct sockaddr_nl sa;
208     struct nlmsghdr *h;
209     int i = 1;
210     int ret = ERROR_FAIL;
211     uint8_t *buff = NULL;
212     int64_t size;
213 
214     STATE_AO_GC(cps->ao);
215 
216     /* If enable userspace proxy mode, we don't need setup kernel proxy */
217     if (cps->is_userspace_proxy) {
218         struct sockaddr_in addr;
219         int port;
220         char recvbuff[1024];
221         const char sendbuf[] = "COLO_USERSPACE_PROXY_INIT";
222 
223         memset(&addr, 0, sizeof(addr));
224         port = atoi(cps->checkpoint_port);
225         addr.sin_family = AF_INET;
226         addr.sin_port = htons(port);
227         addr.sin_addr.s_addr = inet_addr(cps->checkpoint_host);
228 
229         skfd = socket(AF_INET, SOCK_STREAM, 0);
230         if (skfd < 0) {
231             LOGD(ERROR, ao->domid, "can not create a TCP socket: %s",
232                  strerror(errno));
233             goto out;
234         }
235 
236         cps->sock_fd = skfd;
237 
238         if (connect(skfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
239             LOGD(ERROR, ao->domid, "connect error");
240             goto out;
241         }
242 
243         ret = colo_userspace_proxy_send(cps, (uint8_t *)sendbuf, strlen(sendbuf));
244         if (ret < 0)
245             goto out;
246 
247         ret = colo_userspace_proxy_recv(cps, recvbuff, COLO_DEFAULT_WAIT_TIME);
248         if (ret < 0) {
249             LOGD(ERROR, ao->domid, "Can't recv msg from qemu colo-compare: %s",
250                  strerror(errno));
251             goto out;
252         }
253 
254         return 0;
255     }
256 
257     skfd = socket(PF_NETLINK, SOCK_RAW, NETLINK_COLO);
258     if (skfd < 0) {
259         LOGD(ERROR, ao->domid, "can not create a netlink socket: %s", strerror(errno));
260         goto out;
261     }
262     cps->sock_fd = skfd;
263     memset(&sa, 0, sizeof(sa));
264     sa.nl_family = AF_NETLINK;
265     sa.nl_groups = 0;
266 retry:
267     sa.nl_pid = i++;
268 
269     if (i > 10) {
270         LOGD(ERROR, ao->domid, "netlink bind error");
271         goto out;
272     }
273 
274     ret = bind(skfd, (struct sockaddr *)&sa, sizeof(sa));
275     if (ret < 0 && errno == EADDRINUSE) {
276         LOGD(ERROR, ao->domid, "colo index %d has already in used", sa.nl_pid);
277         goto retry;
278     } else if (ret < 0) {
279         LOGD(ERROR, ao->domid, "netlink bind error");
280         goto out;
281     }
282 
283     cps->index = sa.nl_pid;
284     ret = colo_proxy_send(cps, NULL, 0, COLO_PROXY_INIT);
285     if (ret < 0)
286         goto out;
287 
288     /* receive ack */
289     size = colo_proxy_recv(cps, &buff, 500000);
290     if (size < 0) {
291         LOGD(ERROR, ao->domid, "Can't recv msg from kernel by netlink: %s",
292              strerror(errno));
293         goto out;
294     }
295 
296     if (size) {
297         h = (struct nlmsghdr *)buff;
298         if (h->nlmsg_type == NLMSG_ERROR) {
299             /* ack's type is NLMSG_ERROR */
300             struct nlmsgerr *err = (struct nlmsgerr *)NLMSG_DATA(h);
301 
302             if (size - sizeof(*h) < sizeof(*err)) {
303                 LOGD(ERROR, ao->domid, "NLMSG_LENGTH is too short");
304                 goto out;
305             }
306 
307             if (err->error) {
308                 LOGD(ERROR, ao->domid, "NLMSG_ERROR contains error %d", err->error);
309                 goto out;
310             }
311         }
312     }
313 
314     ret = 0;
315 
316 out:
317     free(buff);
318     if (ret) {
319         close(cps->sock_fd);
320         cps->sock_fd = -1;
321     }
322     return ret;
323 }
324 
colo_proxy_teardown(libxl__colo_proxy_state * cps)325 void colo_proxy_teardown(libxl__colo_proxy_state *cps)
326 {
327     /*
328      * If enable userspace proxy mode,
329      * we don't need teardown kernel proxy
330      */
331     if (cps->is_userspace_proxy)
332         return;
333 
334     if (cps->sock_fd >= 0) {
335         close(cps->sock_fd);
336         cps->sock_fd = -1;
337     }
338 }
339 
340 /* ========= colo-proxy: preresume, postresume and checkpoint ========== */
341 
colo_proxy_preresume(libxl__colo_proxy_state * cps)342 void colo_proxy_preresume(libxl__colo_proxy_state *cps)
343 {
344     /*
345      * If enable userspace proxy mode,
346      * we don't need preresume kernel proxy
347      */
348     if (cps->is_userspace_proxy) {
349         const char sendbuf[] = "COLO_CHECKPOINT";
350         colo_userspace_proxy_send(cps,
351                                   (uint8_t *)sendbuf,
352                                   strlen(sendbuf));
353         return;
354     }
355 
356     colo_proxy_send(cps, NULL, 0, COLO_CHECKPOINT);
357     /* TODO: need to handle if the call fails... */
358 }
359 
colo_proxy_postresume(libxl__colo_proxy_state * cps)360 void colo_proxy_postresume(libxl__colo_proxy_state *cps)
361 {
362     /* nothing to do... */
363 }
364 
365 typedef struct colo_msg {
366     bool is_checkpoint;
367 } colo_msg;
368 
369 /*
370  * Return value:
371  * -1: error
372  *  0: no checkpoint event is received before timeout
373  *  1: do checkpoint
374  */
colo_proxy_checkpoint(libxl__colo_proxy_state * cps,unsigned int timeout_us)375 int colo_proxy_checkpoint(libxl__colo_proxy_state *cps,
376                           unsigned int timeout_us)
377 {
378     uint8_t *buff = NULL;
379     int64_t size;
380     struct nlmsghdr *h;
381     struct colo_msg *m;
382     int ret = -1;
383     char recvbuff[1024];
384 
385     STATE_AO_GC(cps->ao);
386 
387     /*
388      * Enable userspace proxy to periodical checkpoint mode,
389      * sleeping temporarily for colo userspace proxy mode.
390      * then we will use socket recv instead of this usleep.
391      * In other words, we use socket communicate with Qemu
392      * Proxy part(colo-compare), for example, notify checkpoint
393      * event.
394      */
395     if (cps->is_userspace_proxy) {
396         ret = colo_userspace_proxy_recv(cps, recvbuff, timeout_us);
397         if (ret <= 0) {
398             ret = 0;
399             goto out;
400         }
401 
402         if (!strcmp(recvbuff, "DO_CHECKPOINT")) {
403             ret = 1;
404         } else {
405             LOGD(ERROR, ao->domid, "receive qemu colo-compare checkpoint error");
406             ret = 0;
407         }
408         goto out;
409     }
410 
411     size = colo_proxy_recv(cps, &buff, timeout_us);
412 
413     /* timeout, return no checkpoint message. */
414     if (size <= 0) {
415         ret = 0;
416         goto out;
417     }
418 
419     h = (struct nlmsghdr *) buff;
420 
421     if (h->nlmsg_type == NLMSG_ERROR) {
422         LOGD(ERROR, ao->domid, "receive NLMSG_ERROR");
423         goto out;
424     }
425 
426     if (h->nlmsg_len < NLMSG_LENGTH(sizeof(*m))) {
427         LOGD(ERROR, ao->domid, "NLMSG_LENGTH is too short");
428         goto out;
429     }
430 
431     m = NLMSG_DATA(h);
432 
433     ret = m->is_checkpoint ? 1 : 0;
434 
435 out:
436     free(buff);
437     return ret;
438 }
439