1 /*
2 * Copyright (C) 2016 FUJITSU LIMITED
3 * Author: Yang Hongyang <hongyang.yang@easystack.cn>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU Lesser General Public License as published
7 * by the Free Software Foundation; version 2.1 only. with the special
8 * exception on linking described in file LICENSE.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU Lesser General Public License for more details.
14 */
15
16 #include "libxl_osdeps.h" /* must come before any other headers */
17
18 #include "libxl_internal.h"
19
20 #include <netlink/netlink.h>
21 #include <arpa/inet.h>
22 #include <sys/socket.h>
23 #include <netinet/in.h>
24
25 /* Consistent with the new COLO netlink channel in kernel side */
26 #define NETLINK_COLO 28
27 #define COLO_DEFAULT_WAIT_TIME 500000
28
29 enum colo_netlink_op {
30 COLO_QUERY_CHECKPOINT = (NLMSG_MIN_TYPE + 1),
31 COLO_CHECKPOINT,
32 COLO_FAILOVER,
33 COLO_PROXY_INIT,
34 COLO_PROXY_RESET, /* UNUSED, will be used for continuous FT */
35 };
36
37 /* ========= colo-proxy: helper functions ========== */
38
colo_proxy_send(libxl__colo_proxy_state * cps,uint8_t * buff,uint64_t size,int type)39 static int colo_proxy_send(libxl__colo_proxy_state *cps, uint8_t *buff,
40 uint64_t size, int type)
41 {
42 struct sockaddr_nl sa;
43 struct nlmsghdr msg;
44 struct iovec iov;
45 struct msghdr mh;
46 int ret;
47
48 STATE_AO_GC(cps->ao);
49
50 memset(&sa, 0, sizeof(sa));
51 sa.nl_family = AF_NETLINK;
52 sa.nl_pid = 0;
53 sa.nl_groups = 0;
54
55 msg.nlmsg_len = NLMSG_SPACE(0);
56 msg.nlmsg_flags = NLM_F_REQUEST;
57 if (type == COLO_PROXY_INIT)
58 msg.nlmsg_flags |= NLM_F_ACK;
59 msg.nlmsg_seq = 0;
60 msg.nlmsg_pid = cps->index;
61 msg.nlmsg_type = type;
62
63 iov.iov_base = &msg;
64 iov.iov_len = msg.nlmsg_len;
65
66 mh.msg_name = &sa;
67 mh.msg_namelen = sizeof(sa);
68 mh.msg_iov = &iov;
69 mh.msg_iovlen = 1;
70 mh.msg_control = NULL;
71 mh.msg_controllen = 0;
72 mh.msg_flags = 0;
73
74 ret = sendmsg(cps->sock_fd, &mh, 0);
75 if (ret <= 0) {
76 LOGD(ERROR, ao->domid, "can't send msg to kernel by netlink: %s",
77 strerror(errno));
78 }
79
80 return ret;
81 }
82
colo_userspace_proxy_send(libxl__colo_proxy_state * cps,uint8_t * buff,uint32_t size)83 static int colo_userspace_proxy_send(libxl__colo_proxy_state *cps,
84 uint8_t *buff,
85 uint32_t size)
86 {
87 int ret = 0;
88 uint32_t len = 0;
89
90 len = htonl(size);
91 ret = send(cps->sock_fd, (uint8_t *)&len, sizeof(len), 0);
92 if (ret != sizeof(len)) {
93 goto err;
94 }
95
96 ret = send(cps->sock_fd, (uint8_t *)buff, size, 0);
97 if (ret != size) {
98 goto err;
99 }
100
101 err:
102 return ret;
103 }
104
colo_userspace_proxy_recv(libxl__colo_proxy_state * cps,char * buff,unsigned int timeout_us)105 static int colo_userspace_proxy_recv(libxl__colo_proxy_state *cps,
106 char *buff,
107 unsigned int timeout_us)
108 {
109 struct timeval tv;
110 int ret;
111 uint32_t len = 0;
112 uint32_t size = 0;
113
114 STATE_AO_GC(cps->ao);
115
116 if (timeout_us) {
117 tv.tv_sec = timeout_us / 1000000;
118 tv.tv_usec = timeout_us % 1000000;
119 ret = setsockopt(cps->sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv,
120 sizeof(tv));
121 if (ret < 0) {
122 LOGD(ERROR, ao->domid,
123 "colo_userspace_proxy_recv setsockopt error: %s",
124 strerror(errno));
125 }
126 }
127
128 ret = recv(cps->sock_fd, (uint8_t *)&len, sizeof(len), 0);
129 if (ret < 0) {
130 goto err;
131 }
132
133 size = ntohl(len);
134 ret = recv(cps->sock_fd, buff, size, 0);
135
136 err:
137 return ret;
138 }
139
140 /* error: return -1, otherwise return 0 */
colo_proxy_recv(libxl__colo_proxy_state * cps,uint8_t ** buff,unsigned int timeout_us)141 static int64_t colo_proxy_recv(libxl__colo_proxy_state *cps, uint8_t **buff,
142 unsigned int timeout_us)
143 {
144 struct sockaddr_nl sa;
145 struct iovec iov;
146 struct msghdr mh = {
147 .msg_name = &sa,
148 .msg_namelen = sizeof(sa),
149 .msg_iov = &iov,
150 .msg_iovlen = 1,
151 };
152 struct timeval tv;
153 uint32_t size = 16384;
154 int64_t len = 0;
155 int ret;
156
157 STATE_AO_GC(cps->ao);
158 uint8_t *tmp = libxl__malloc(NOGC, size);
159
160 if (timeout_us) {
161 tv.tv_sec = timeout_us / 1000000;
162 tv.tv_usec = timeout_us % 1000000;
163 setsockopt(cps->sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
164 }
165
166 iov.iov_base = tmp;
167 iov.iov_len = size;
168 next:
169 ret = recvmsg(cps->sock_fd, &mh, 0);
170 if (ret <= 0) {
171 if (errno != EAGAIN && errno != EWOULDBLOCK)
172 LOGED(ERROR, ao->domid, "can't recv msg from kernel by netlink");
173 goto err;
174 }
175
176 len += ret;
177 if (mh.msg_flags & MSG_TRUNC) {
178 size += 16384;
179 tmp = libxl__realloc(NOGC, tmp, size);
180 iov.iov_base = tmp + len;
181 iov.iov_len = size - len;
182 goto next;
183 }
184
185 *buff = tmp;
186 ret = len;
187 goto out;
188
189 err:
190 free(tmp);
191 *buff = NULL;
192
193 out:
194 if (timeout_us) {
195 tv.tv_sec = 0;
196 tv.tv_usec = 0;
197 setsockopt(cps->sock_fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
198 }
199 return ret;
200 }
201
202 /* ========= colo-proxy: setup and teardown ========== */
203
colo_proxy_setup(libxl__colo_proxy_state * cps)204 int colo_proxy_setup(libxl__colo_proxy_state *cps)
205 {
206 int skfd = 0;
207 struct sockaddr_nl sa;
208 struct nlmsghdr *h;
209 int i = 1;
210 int ret = ERROR_FAIL;
211 uint8_t *buff = NULL;
212 int64_t size;
213
214 STATE_AO_GC(cps->ao);
215
216 /* If enable userspace proxy mode, we don't need setup kernel proxy */
217 if (cps->is_userspace_proxy) {
218 struct sockaddr_in addr;
219 int port;
220 char recvbuff[1024];
221 const char sendbuf[] = "COLO_USERSPACE_PROXY_INIT";
222
223 memset(&addr, 0, sizeof(addr));
224 port = atoi(cps->checkpoint_port);
225 addr.sin_family = AF_INET;
226 addr.sin_port = htons(port);
227 addr.sin_addr.s_addr = inet_addr(cps->checkpoint_host);
228
229 skfd = socket(AF_INET, SOCK_STREAM, 0);
230 if (skfd < 0) {
231 LOGD(ERROR, ao->domid, "can not create a TCP socket: %s",
232 strerror(errno));
233 goto out;
234 }
235
236 cps->sock_fd = skfd;
237
238 if (connect(skfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
239 LOGD(ERROR, ao->domid, "connect error");
240 goto out;
241 }
242
243 ret = colo_userspace_proxy_send(cps, (uint8_t *)sendbuf, strlen(sendbuf));
244 if (ret < 0)
245 goto out;
246
247 ret = colo_userspace_proxy_recv(cps, recvbuff, COLO_DEFAULT_WAIT_TIME);
248 if (ret < 0) {
249 LOGD(ERROR, ao->domid, "Can't recv msg from qemu colo-compare: %s",
250 strerror(errno));
251 goto out;
252 }
253
254 return 0;
255 }
256
257 skfd = socket(PF_NETLINK, SOCK_RAW, NETLINK_COLO);
258 if (skfd < 0) {
259 LOGD(ERROR, ao->domid, "can not create a netlink socket: %s", strerror(errno));
260 goto out;
261 }
262 cps->sock_fd = skfd;
263 memset(&sa, 0, sizeof(sa));
264 sa.nl_family = AF_NETLINK;
265 sa.nl_groups = 0;
266 retry:
267 sa.nl_pid = i++;
268
269 if (i > 10) {
270 LOGD(ERROR, ao->domid, "netlink bind error");
271 goto out;
272 }
273
274 ret = bind(skfd, (struct sockaddr *)&sa, sizeof(sa));
275 if (ret < 0 && errno == EADDRINUSE) {
276 LOGD(ERROR, ao->domid, "colo index %d has already in used", sa.nl_pid);
277 goto retry;
278 } else if (ret < 0) {
279 LOGD(ERROR, ao->domid, "netlink bind error");
280 goto out;
281 }
282
283 cps->index = sa.nl_pid;
284 ret = colo_proxy_send(cps, NULL, 0, COLO_PROXY_INIT);
285 if (ret < 0)
286 goto out;
287
288 /* receive ack */
289 size = colo_proxy_recv(cps, &buff, 500000);
290 if (size < 0) {
291 LOGD(ERROR, ao->domid, "Can't recv msg from kernel by netlink: %s",
292 strerror(errno));
293 goto out;
294 }
295
296 if (size) {
297 h = (struct nlmsghdr *)buff;
298 if (h->nlmsg_type == NLMSG_ERROR) {
299 /* ack's type is NLMSG_ERROR */
300 struct nlmsgerr *err = (struct nlmsgerr *)NLMSG_DATA(h);
301
302 if (size - sizeof(*h) < sizeof(*err)) {
303 LOGD(ERROR, ao->domid, "NLMSG_LENGTH is too short");
304 goto out;
305 }
306
307 if (err->error) {
308 LOGD(ERROR, ao->domid, "NLMSG_ERROR contains error %d", err->error);
309 goto out;
310 }
311 }
312 }
313
314 ret = 0;
315
316 out:
317 free(buff);
318 if (ret) {
319 close(cps->sock_fd);
320 cps->sock_fd = -1;
321 }
322 return ret;
323 }
324
colo_proxy_teardown(libxl__colo_proxy_state * cps)325 void colo_proxy_teardown(libxl__colo_proxy_state *cps)
326 {
327 /*
328 * If enable userspace proxy mode,
329 * we don't need teardown kernel proxy
330 */
331 if (cps->is_userspace_proxy)
332 return;
333
334 if (cps->sock_fd >= 0) {
335 close(cps->sock_fd);
336 cps->sock_fd = -1;
337 }
338 }
339
340 /* ========= colo-proxy: preresume, postresume and checkpoint ========== */
341
colo_proxy_preresume(libxl__colo_proxy_state * cps)342 void colo_proxy_preresume(libxl__colo_proxy_state *cps)
343 {
344 /*
345 * If enable userspace proxy mode,
346 * we don't need preresume kernel proxy
347 */
348 if (cps->is_userspace_proxy) {
349 const char sendbuf[] = "COLO_CHECKPOINT";
350 colo_userspace_proxy_send(cps,
351 (uint8_t *)sendbuf,
352 strlen(sendbuf));
353 return;
354 }
355
356 colo_proxy_send(cps, NULL, 0, COLO_CHECKPOINT);
357 /* TODO: need to handle if the call fails... */
358 }
359
colo_proxy_postresume(libxl__colo_proxy_state * cps)360 void colo_proxy_postresume(libxl__colo_proxy_state *cps)
361 {
362 /* nothing to do... */
363 }
364
365 typedef struct colo_msg {
366 bool is_checkpoint;
367 } colo_msg;
368
369 /*
370 * Return value:
371 * -1: error
372 * 0: no checkpoint event is received before timeout
373 * 1: do checkpoint
374 */
colo_proxy_checkpoint(libxl__colo_proxy_state * cps,unsigned int timeout_us)375 int colo_proxy_checkpoint(libxl__colo_proxy_state *cps,
376 unsigned int timeout_us)
377 {
378 uint8_t *buff = NULL;
379 int64_t size;
380 struct nlmsghdr *h;
381 struct colo_msg *m;
382 int ret = -1;
383 char recvbuff[1024];
384
385 STATE_AO_GC(cps->ao);
386
387 /*
388 * Enable userspace proxy to periodical checkpoint mode,
389 * sleeping temporarily for colo userspace proxy mode.
390 * then we will use socket recv instead of this usleep.
391 * In other words, we use socket communicate with Qemu
392 * Proxy part(colo-compare), for example, notify checkpoint
393 * event.
394 */
395 if (cps->is_userspace_proxy) {
396 ret = colo_userspace_proxy_recv(cps, recvbuff, timeout_us);
397 if (ret <= 0) {
398 ret = 0;
399 goto out;
400 }
401
402 if (!strcmp(recvbuff, "DO_CHECKPOINT")) {
403 ret = 1;
404 } else {
405 LOGD(ERROR, ao->domid, "receive qemu colo-compare checkpoint error");
406 ret = 0;
407 }
408 goto out;
409 }
410
411 size = colo_proxy_recv(cps, &buff, timeout_us);
412
413 /* timeout, return no checkpoint message. */
414 if (size <= 0) {
415 ret = 0;
416 goto out;
417 }
418
419 h = (struct nlmsghdr *) buff;
420
421 if (h->nlmsg_type == NLMSG_ERROR) {
422 LOGD(ERROR, ao->domid, "receive NLMSG_ERROR");
423 goto out;
424 }
425
426 if (h->nlmsg_len < NLMSG_LENGTH(sizeof(*m))) {
427 LOGD(ERROR, ao->domid, "NLMSG_LENGTH is too short");
428 goto out;
429 }
430
431 m = NLMSG_DATA(h);
432
433 ret = m->is_checkpoint ? 1 : 0;
434
435 out:
436 free(buff);
437 return ret;
438 }
439