1 /*
2 * Copyright (C) 2015 Citrix Ltd.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License as published
6 * by the Free Software Foundation; version 2.1 only. with the special
7 * exception on linking described in file LICENSE.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU Lesser General Public License for more details.
13 */
14
15 #include "libxl_osdeps.h" /* must come before any other headers */
16
17 #include "libxl_internal.h"
18
19 /*
20 * Infrastructure for reading and acting on the contents of a libxl
21 * migration stream. There are a lot of moving parts here.
22 *
23 * The logic revolves around two actions; reading another record from
24 * the stream, and processing the records. The stream_continue()
25 * function is responsible for choosing the next action to perform.
26 *
27 * The exact order of reading and processing is controlled by 'phase'.
28 * All complete records are held in the record_queue before being
29 * processed, and all records will be processed in queue order.
30 *
31 * Internal states:
32 * running phase in_ record incoming
33 * checkpoint _queue _record
34 *
35 * Undefined undef undef undef undef undef
36 * Idle false undef false 0 0
37 * Active true NORMAL false 0/1 0/partial
38 * Active true BUFFERING true any 0/partial
39 * Active true UNBUFFERING true any 0
40 *
41 * While reading data from the stream, 'dc' is active and a callback
42 * is expected. Most actions in process_record() start a callback of
43 * their own. Those which don't return out and stream_continue() sets
44 * up the next action.
45 *
46 * PHASE_NORMAL:
47 * This phase is used for regular migration or resume from file.
48 * Records are read one at time and immediately processed. (The
49 * record queue will not contain more than a single record.)
50 *
51 * PHASE_BUFFERING:
52 * This phase is used in checkpointed streams, when libxc signals
53 * the presence of a checkpoint in the stream. Records are read and
54 * buffered until a CHECKPOINT_END record has been read.
55 *
56 * PHASE_UNBUFFERING:
57 * Once a CHECKPOINT_END record has been read, all buffered records
58 * are processed.
59 *
60 * Note:
61 * Record buffers are not allocated from a GC; they are allocated
62 * and tracked manually. This is to avoid OOM with Remus where the
63 * AO lives for the lifetime of the process. Per-checkpoint AO's
64 * might be an avenue to explore.
65 *
66 * Entry points from outside:
67 * - libxl__stream_read_init()
68 * - Initialises state. Must be called once before _start()
69 * - libxl__stream_read_start()
70 * - Starts reading records from the stream, and acting on them.
71 * - libxl__stream_read_start_checkpoint()
72 * - Starts buffering records at a checkpoint. Must be called on
73 * a running stream.
74 *
75 * There are several chains of event:
76 *
77 * 1) Starting a stream follows:
78 * - libxl__stream_read_start()
79 * - stream_header_done()
80 * - stream_continue()
81 *
82 * 2) Reading a record follows:
83 * - stream_continue()
84 * - record_header_done()
85 * - record_body_done()
86 * - stream_continue()
87 *
88 * 3) Processing a record had several chains to follow, depending on
89 * the record in question.
90 * 3a) "Simple" record:
91 * - process_record()
92 * - stream_continue()
93 * 3b) LIBXC record:
94 * - process_record()
95 * - libxl__xc_domain_restore()
96 * - libxl__xc_domain_restore_done()
97 * - stream_continue()
98 * 3c) EMULATOR record:
99 * - process_record()
100 * - stream_write_emulator()
101 * - stream_write_emulator_done()
102 * - stream_continue()
103 *
104 * Depending on the contents of the stream, there are likely to be several
105 * parallel tasks being managed. check_all_finished() is used to join all
106 * tasks in both success and error cases.
107 *
108 * Failover for remus
109 * - We buffer all records until a CHECKPOINT_END record is received
110 * - We will consume the buffered records when a CHECKPOINT_END record
111 * is received
112 * - If we find some internal error, then rc or retval is not 0 in
113 * libxl__xc_domain_restore_done(). In this case, we don't resume the
114 * guest
115 * - If we need to do failover from primary, then rc and retval are both
116 * 0 in libxl__xc_domain_restore_done(). In this case, the buffered
117 * state will be dropped, because we haven't received a CHECKPOINT_END
118 * record, and therefore the buffered state is inconsistent. In
119 * libxl__xc_domain_restore_done(), we just complete the stream and
120 * stream->completion_callback() will be called to resume the guest
121 *
122 * For back channel stream:
123 * - libxl__stream_read_start()
124 * - Set up the stream to running state
125 *
126 * - libxl__stream_read_continue()
127 * - Set up reading the next record from a started stream.
128 * Add some codes to process_record() to handle the record.
129 * Then call stream->checkpoint_callback() to return.
130 */
131
132 /* Success/error/cleanup handling. */
133 static void stream_complete(libxl__egc *egc,
134 libxl__stream_read_state *stream, int rc);
135 static void checkpoint_done(libxl__egc *egc,
136 libxl__stream_read_state *stream, int rc);
137 static void stream_done(libxl__egc *egc,
138 libxl__stream_read_state *stream, int rc);
139 static void conversion_done(libxl__egc *egc,
140 libxl__conversion_helper_state *chs, int rc);
141 static void check_all_finished(libxl__egc *egc,
142 libxl__stream_read_state *stream, int rc);
143
144 /* Event chain for first iteration, from _start(). */
145 static void stream_header_done(libxl__egc *egc,
146 libxl__datacopier_state *dc,
147 int rc, int onwrite, int errnoval);
148 static void stream_continue(libxl__egc *egc,
149 libxl__stream_read_state *stream);
150 static void setup_read_record(libxl__egc *egc,
151 libxl__stream_read_state *stream);
152 static void record_header_done(libxl__egc *egc,
153 libxl__datacopier_state *dc,
154 int rc, int onwrite, int errnoval);
155 static void record_body_done(libxl__egc *egc,
156 libxl__datacopier_state *dc,
157 int rc, int onwrite, int errnoval);
158 static bool process_record(libxl__egc *egc,
159 libxl__stream_read_state *stream);
160
161 /* Event chain for processing an emulator blob. */
162 static void write_emulator_blob(libxl__egc *egc,
163 libxl__stream_read_state *stream,
164 libxl__sr_record_buf *rec);
165 static void write_emulator_done(libxl__egc *egc,
166 libxl__datacopier_state *dc,
167 int rc, int onwrite, int errnoval);
168
169 /* Handlers for checkpoint state mini-loop */
170 static void checkpoint_state_done(libxl__egc *egc,
171 libxl__stream_read_state *stream, int rc);
172
173 /*----- Helpers -----*/
174
175 /* Helper to set up reading some data from the stream. */
setup_read(libxl__stream_read_state * stream,const char * what,void * ptr,size_t nr_bytes,libxl__datacopier_callback cb)176 static int setup_read(libxl__stream_read_state *stream,
177 const char *what, void *ptr, size_t nr_bytes,
178 libxl__datacopier_callback cb)
179 {
180 libxl__datacopier_state *dc = &stream->dc;
181
182 dc->readwhat = what;
183 dc->readbuf = ptr;
184 dc->bytes_to_read = nr_bytes;
185 dc->used = 0;
186 dc->callback = cb;
187
188 return libxl__datacopier_start(dc);
189 }
190
free_record(libxl__sr_record_buf * rec)191 static void free_record(libxl__sr_record_buf *rec)
192 {
193 if (rec) {
194 free(rec->body);
195 free(rec);
196 }
197 }
198
199 /*----- Entrypoints -----*/
200
libxl__stream_read_init(libxl__stream_read_state * stream)201 void libxl__stream_read_init(libxl__stream_read_state *stream)
202 {
203 assert(stream->ao);
204
205 stream->shs.ao = stream->ao;
206 libxl__save_helper_init(&stream->shs);
207
208 stream->chs.ao = stream->ao;
209 libxl__conversion_helper_init(&stream->chs);
210
211 stream->rc = 0;
212 stream->running = false;
213 stream->in_checkpoint = false;
214 stream->sync_teardown = false;
215 FILLZERO(stream->dc);
216 FILLZERO(stream->hdr);
217 LIBXL_STAILQ_INIT(&stream->record_queue);
218 stream->phase = SRS_PHASE_NORMAL;
219 stream->recursion_guard = false;
220 stream->incoming_record = NULL;
221 FILLZERO(stream->emu_dc);
222 stream->emu_carefd = NULL;
223 }
224
libxl__stream_read_start(libxl__egc * egc,libxl__stream_read_state * stream)225 void libxl__stream_read_start(libxl__egc *egc,
226 libxl__stream_read_state *stream)
227 {
228 libxl__datacopier_state *dc = &stream->dc;
229 STATE_AO_GC(stream->ao);
230 int rc = 0;
231
232 libxl__stream_read_init(stream);
233
234 stream->running = true;
235 stream->phase = SRS_PHASE_NORMAL;
236
237 if (stream->legacy) {
238 /*
239 * Convert the legacy stream.
240 *
241 * This results in a fork()/exec() of conversion helper script. It is
242 * passed the exiting stream->fd as an input, and returns the
243 * transformed stream via a new pipe. The fd of this new pipe then
244 * replaces stream->fd, to make the rest of the stream read code
245 * agnostic to whether legacy conversion is happening or not.
246 */
247 libxl__conversion_helper_state *chs = &stream->chs;
248
249 chs->legacy_fd = stream->fd;
250 chs->hvm =
251 (stream->dcs->guest_config->b_info.type == LIBXL_DOMAIN_TYPE_HVM);
252 chs->completion_callback = conversion_done;
253
254 rc = libxl__convert_legacy_stream(egc, &stream->chs);
255
256 if (rc) {
257 LOG(ERROR, "Failed to start the legacy stream conversion helper");
258 goto err;
259 }
260
261 /* There should be no interaction of COLO backchannels and legacy
262 * stream conversion. */
263 assert(!stream->back_channel);
264
265 /* Confirm *dc is still zeroed out, while we shuffle stream->fd. */
266 assert(dc->ao == NULL);
267 assert(stream->chs.v2_carefd);
268 stream->fd = libxl__carefd_fd(stream->chs.v2_carefd);
269 stream->dcs->libxc_fd = stream->fd;
270 }
271 /* stream->fd is now a v2 stream. */
272
273 dc->ao = stream->ao;
274 dc->copywhat = "restore v2 stream";
275 dc->readfd = stream->fd;
276 dc->writefd = -1;
277
278 if (stream->back_channel)
279 return;
280
281 /* Start reading the stream header. */
282 rc = setup_read(stream, "stream header",
283 &stream->hdr, sizeof(stream->hdr),
284 stream_header_done);
285 if (rc)
286 goto err;
287
288 assert(!rc);
289 return;
290
291 err:
292 assert(rc);
293 stream_complete(egc, stream, rc);
294 }
295
libxl__stream_read_start_checkpoint(libxl__egc * egc,libxl__stream_read_state * stream)296 void libxl__stream_read_start_checkpoint(libxl__egc *egc,
297 libxl__stream_read_state *stream)
298 {
299 assert(stream->running);
300 assert(!stream->in_checkpoint);
301
302 stream->in_checkpoint = true;
303 stream->phase = SRS_PHASE_BUFFERING;
304
305 /*
306 * Libxc has handed control of the fd to us. Start reading some
307 * libxl records out of it.
308 */
309 stream_continue(egc, stream);
310 }
311
libxl__stream_read_abort(libxl__egc * egc,libxl__stream_read_state * stream,int rc)312 void libxl__stream_read_abort(libxl__egc *egc,
313 libxl__stream_read_state *stream, int rc)
314 {
315 assert(rc);
316
317 if (stream->running)
318 stream_complete(egc, stream, rc);
319 }
320
321 /*----- Event logic -----*/
322
stream_header_done(libxl__egc * egc,libxl__datacopier_state * dc,int rc,int onwrite,int errnoval)323 static void stream_header_done(libxl__egc *egc,
324 libxl__datacopier_state *dc,
325 int rc, int onwrite, int errnoval)
326 {
327 libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
328 libxl__sr_hdr *hdr = &stream->hdr;
329 STATE_AO_GC(dc->ao);
330
331 if (rc)
332 goto err;
333
334 hdr->ident = be64toh(hdr->ident);
335 hdr->version = be32toh(hdr->version);
336 hdr->options = be32toh(hdr->options);
337
338 if (hdr->ident != RESTORE_STREAM_IDENT) {
339 rc = ERROR_FAIL;
340 LOG(ERROR,
341 "Invalid ident: expected 0x%016"PRIx64", got 0x%016"PRIx64,
342 RESTORE_STREAM_IDENT, hdr->ident);
343 goto err;
344 }
345 if (hdr->version != RESTORE_STREAM_VERSION) {
346 rc = ERROR_FAIL;
347 LOG(ERROR, "Unexpected Version: expected %"PRIu32", got %"PRIu32,
348 RESTORE_STREAM_VERSION, hdr->version);
349 goto err;
350 }
351 if (hdr->options & RESTORE_OPT_BIG_ENDIAN) {
352 rc = ERROR_FAIL;
353 LOG(ERROR, "Unable to handle big endian streams");
354 goto err;
355 }
356
357 LOG(DEBUG, "Stream v%"PRIu32"%s", hdr->version,
358 hdr->options & RESTORE_OPT_LEGACY ? " (from legacy)" : "");
359
360 stream_continue(egc, stream);
361 return;
362
363 err:
364 assert(rc);
365 stream_complete(egc, stream, rc);
366 }
367
stream_continue(libxl__egc * egc,libxl__stream_read_state * stream)368 static void stream_continue(libxl__egc *egc,
369 libxl__stream_read_state *stream)
370 {
371 STATE_AO_GC(stream->ao);
372
373 /*
374 * Must not mutually recurse with process_record().
375 *
376 * For records whose processing function is synchronous
377 * (e.g. TOOLSTACK), process_record() does not start another async
378 * operation, and a further operation should be started.
379 *
380 * A naive solution, which would function in general, would be for
381 * process_record() to call stream_continue(). However, this
382 * would allow the content of the stream to cause mutual
383 * recursion, and possibly for us to fall off our stack.
384 *
385 * Instead, process_record() indicates with its return value
386 * whether a further operation needs to start, and the
387 * recursion_guard is in place to catch any code paths which get
388 * this wrong.
389 */
390 assert(stream->recursion_guard == false);
391 stream->recursion_guard = true;
392
393 switch (stream->phase) {
394 case SRS_PHASE_NORMAL:
395 /*
396 * Normal phase (regular migration or restore from file):
397 *
398 * logically:
399 * do { read_record(); process_record(); } while ( not END );
400 *
401 * Alternate between reading a record from the stream, and
402 * processing the record. There should never be two records
403 * in the queue.
404 */
405 if (LIBXL_STAILQ_EMPTY(&stream->record_queue))
406 setup_read_record(egc, stream);
407 else {
408 if (process_record(egc, stream))
409 setup_read_record(egc, stream);
410
411 /*
412 * process_record() had better have consumed the one and
413 * only record in the queue.
414 */
415 assert(LIBXL_STAILQ_EMPTY(&stream->record_queue));
416 }
417 break;
418
419 case SRS_PHASE_BUFFERING: {
420 /*
421 * Buffering phase (checkpointed streams only):
422 *
423 * logically:
424 * do { read_record(); } while ( not CHECKPOINT_END );
425 *
426 * Read and buffer all records from the stream until a
427 * CHECKPOINT_END record is encountered. We need to peek at
428 * the tail to spot the CHECKPOINT_END record, and switch to
429 * the unbuffering phase.
430 */
431 libxl__sr_record_buf *rec = LIBXL_STAILQ_LAST(
432 &stream->record_queue, libxl__sr_record_buf, entry);
433
434 assert(stream->in_checkpoint);
435
436 if (!rec || (rec->hdr.type != REC_TYPE_CHECKPOINT_END)) {
437 setup_read_record(egc, stream);
438 break;
439 }
440
441 /*
442 * There are now some number of buffered records, with a
443 * CHECKPOINT_END at the end. Start processing them all.
444 */
445 stream->phase = SRS_PHASE_UNBUFFERING;
446 }
447 /* FALLTHROUGH */
448 case SRS_PHASE_UNBUFFERING:
449 /*
450 * Unbuffering phase (checkpointed streams only):
451 *
452 * logically:
453 * do { process_record(); } while ( not CHECKPOINT_END );
454 *
455 * Process all records collected during the buffering phase.
456 */
457 assert(stream->in_checkpoint);
458
459 while (process_record(egc, stream))
460 ; /*
461 * Nothing! process_record() helpfully tells us if no specific
462 * futher actions have been set up, in which case we want to go
463 * ahead and process the next record.
464 */
465 break;
466
467 default:
468 abort();
469 }
470
471 assert(stream->recursion_guard == true);
472 stream->recursion_guard = false;
473 }
474
setup_read_record(libxl__egc * egc,libxl__stream_read_state * stream)475 static void setup_read_record(libxl__egc *egc,
476 libxl__stream_read_state *stream)
477 {
478 libxl__sr_record_buf *rec = NULL;
479 STATE_AO_GC(stream->ao);
480 int rc;
481
482 assert(stream->incoming_record == NULL);
483 stream->incoming_record = rec = libxl__zalloc(NOGC, sizeof(*rec));
484
485 rc = setup_read(stream, "record header",
486 &rec->hdr, sizeof(rec->hdr),
487 record_header_done);
488 if (rc)
489 goto err;
490 return;
491
492 err:
493 assert(rc);
494 stream_complete(egc, stream, rc);
495 }
496
record_header_done(libxl__egc * egc,libxl__datacopier_state * dc,int rc,int onwrite,int errnoval)497 static void record_header_done(libxl__egc *egc,
498 libxl__datacopier_state *dc,
499 int rc, int onwrite, int errnoval)
500 {
501 libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
502 libxl__sr_record_buf *rec = stream->incoming_record;
503 STATE_AO_GC(dc->ao);
504
505 if (rc)
506 goto err;
507
508 /* No body? All done. */
509 if (rec->hdr.length == 0) {
510 record_body_done(egc, dc, 0, 0, 0);
511 return;
512 }
513
514 size_t bytes_to_read = ROUNDUP(rec->hdr.length, REC_ALIGN_ORDER);
515 rec->body = libxl__malloc(NOGC, bytes_to_read);
516
517 rc = setup_read(stream, "record body",
518 rec->body, bytes_to_read,
519 record_body_done);
520 if (rc)
521 goto err;
522 return;
523
524 err:
525 assert(rc);
526 stream_complete(egc, stream, rc);
527 }
528
record_body_done(libxl__egc * egc,libxl__datacopier_state * dc,int rc,int onwrite,int errnoval)529 static void record_body_done(libxl__egc *egc,
530 libxl__datacopier_state *dc,
531 int rc, int onwrite, int errnoval)
532 {
533 libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
534 libxl__sr_record_buf *rec = stream->incoming_record;
535 STATE_AO_GC(dc->ao);
536
537 if (rc)
538 goto err;
539
540 LIBXL_STAILQ_INSERT_TAIL(&stream->record_queue, rec, entry);
541 stream->incoming_record = NULL;
542
543 stream_continue(egc, stream);
544 return;
545
546 err:
547 assert(rc);
548 stream_complete(egc, stream, rc);
549 }
550
551 /*
552 * Returns a boolean indicating whether a further action should be set
553 * up by the caller. This is needed to prevent mutual recursion with
554 * stream_continue().
555 *
556 * It is a bug for this function to ever call stream_continue() or
557 * setup_read_record().
558 */
process_record(libxl__egc * egc,libxl__stream_read_state * stream)559 static bool process_record(libxl__egc *egc,
560 libxl__stream_read_state *stream)
561 {
562 STATE_AO_GC(stream->ao);
563 libxl__domain_create_state *dcs = stream->dcs;
564 libxl__sr_record_buf *rec;
565 libxl_sr_checkpoint_state *srcs;
566 bool further_action_needed = false;
567 int rc = 0;
568
569 /* Pop a record from the head of the queue. */
570 assert(!LIBXL_STAILQ_EMPTY(&stream->record_queue));
571 rec = LIBXL_STAILQ_FIRST(&stream->record_queue);
572 LIBXL_STAILQ_REMOVE_HEAD(&stream->record_queue, entry);
573
574 LOG(DEBUG, "Record: %u, length %u", rec->hdr.type, rec->hdr.length);
575
576 switch (rec->hdr.type) {
577
578 case REC_TYPE_END:
579 stream_complete(egc, stream, 0);
580 break;
581
582 case REC_TYPE_LIBXC_CONTEXT:
583 libxl__xc_domain_restore(egc, dcs, &stream->shs, 0, 0);
584 break;
585
586 case REC_TYPE_EMULATOR_XENSTORE_DATA:
587 if (dcs->guest_config->b_info.type != LIBXL_DOMAIN_TYPE_HVM) {
588 rc = ERROR_FAIL;
589 LOG(ERROR,
590 "Received a xenstore emulator record when none was expected");
591 goto err;
592 }
593
594 if (rec->hdr.length < sizeof(libxl__sr_emulator_hdr)) {
595 rc = ERROR_FAIL;
596 LOG(ERROR,
597 "Emulator xenstore data record too short to contain header");
598 goto err;
599 }
600
601 rc = libxl__restore_emulator_xenstore_data(dcs,
602 rec->body + sizeof(libxl__sr_emulator_hdr),
603 rec->hdr.length - sizeof(libxl__sr_emulator_hdr));
604 if (rc)
605 goto err;
606
607 /*
608 * libxl__restore_emulator_xenstore_data() is a synchronous function.
609 * Request that our caller queues another action for us.
610 */
611 further_action_needed = true;
612 break;
613
614 case REC_TYPE_EMULATOR_CONTEXT:
615 if (dcs->guest_config->b_info.type != LIBXL_DOMAIN_TYPE_HVM) {
616 rc = ERROR_FAIL;
617 LOG(ERROR,
618 "Received an emulator context record when none was expected");
619 goto err;
620 }
621
622 write_emulator_blob(egc, stream, rec);
623 break;
624
625 case REC_TYPE_CHECKPOINT_END:
626 if (!stream->in_checkpoint) {
627 LOG(ERROR, "Unexpected CHECKPOINT_END record in stream");
628 rc = ERROR_FAIL;
629 goto err;
630 }
631 checkpoint_done(egc, stream, 0);
632 break;
633
634 case REC_TYPE_CHECKPOINT_STATE:
635 if (!stream->in_checkpoint_state) {
636 LOG(ERROR, "Unexpected CHECKPOINT_STATE record in stream");
637 rc = ERROR_FAIL;
638 goto err;
639 }
640
641 srcs = rec->body;
642 checkpoint_state_done(egc, stream, srcs->id);
643 break;
644
645 default:
646 LOG(ERROR, "Unrecognised record 0x%08x", rec->hdr.type);
647 rc = ERROR_FAIL;
648 goto err;
649 }
650
651 assert(!rc);
652 free_record(rec);
653 return further_action_needed;
654
655 err:
656 assert(rc);
657 free_record(rec);
658 stream_complete(egc, stream, rc);
659 return false;
660 }
661
write_emulator_blob(libxl__egc * egc,libxl__stream_read_state * stream,libxl__sr_record_buf * rec)662 static void write_emulator_blob(libxl__egc *egc,
663 libxl__stream_read_state *stream,
664 libxl__sr_record_buf *rec)
665 {
666 libxl__domain_create_state *dcs = stream->dcs;
667 libxl__datacopier_state *dc = &stream->emu_dc;
668 libxl__sr_emulator_hdr *emu_hdr;
669 STATE_AO_GC(stream->ao);
670 char path[256];
671 int rc = 0, writefd;
672
673 if (rec->hdr.length < sizeof(*emu_hdr)) {
674 rc = ERROR_FAIL;
675 LOG(ERROR, "Emulator record too short to contain header");
676 goto err;
677 }
678 emu_hdr = rec->body;
679
680 sprintf(path, LIBXL_DEVICE_MODEL_RESTORE_FILE".%u", dcs->guest_domid);
681
682 assert(stream->emu_carefd == NULL);
683 libxl__carefd_begin();
684 writefd = open(path, O_WRONLY | O_CREAT | O_TRUNC, 0600);
685 stream->emu_carefd = libxl__carefd_opened(CTX, writefd);
686
687 if (writefd == -1) {
688 rc = ERROR_FAIL;
689 LOGE(ERROR, "unable to open %s", path);
690 goto err;
691 }
692
693 FILLZERO(*dc);
694 dc->ao = stream->ao;
695 dc->writewhat = "qemu save file";
696 dc->copywhat = "restore v2 stream";
697 dc->writefd = writefd;
698 dc->readfd = -1;
699 dc->maxsz = -1;
700 dc->callback = write_emulator_done;
701
702 rc = libxl__datacopier_start(dc);
703 if (rc)
704 goto err;
705
706 libxl__datacopier_prefixdata(egc, dc,
707 rec->body + sizeof(*emu_hdr),
708 rec->hdr.length - sizeof(*emu_hdr));
709 return;
710
711 err:
712 assert(rc);
713 stream_complete(egc, stream, rc);
714 }
715
write_emulator_done(libxl__egc * egc,libxl__datacopier_state * dc,int rc,int onwrite,int errnoval)716 static void write_emulator_done(libxl__egc *egc,
717 libxl__datacopier_state *dc,
718 int rc, int onwrite, int errnoval)
719 {
720 libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, emu_dc);
721 STATE_AO_GC(dc->ao);
722
723 libxl__carefd_close(stream->emu_carefd);
724 stream->emu_carefd = NULL;
725
726 if (rc)
727 goto err;
728
729 stream_continue(egc, stream);
730 return;
731
732 err:
733 assert(rc);
734 stream_complete(egc, stream, rc);
735 }
736
737 /*----- Success/error/cleanup handling. -----*/
738
stream_complete(libxl__egc * egc,libxl__stream_read_state * stream,int rc)739 static void stream_complete(libxl__egc *egc,
740 libxl__stream_read_state *stream, int rc)
741 {
742 assert(stream->running);
743
744 if (stream->in_checkpoint) {
745 assert(rc);
746
747 /*
748 * If an error is encountered while in a checkpoint, pass it
749 * back to libxc. The failure will come back around to us via
750 * libxl__xc_domain_restore_done()
751 */
752 checkpoint_done(egc, stream, rc);
753 return;
754 }
755
756 if (stream->in_checkpoint_state) {
757 assert(rc);
758
759 /*
760 * If an error is encountered while in a checkpoint, pass it
761 * back to libxc. The failure will come back around to us via
762 * 1. normal stream
763 * libxl__xc_domain_restore_done()
764 * 2. back_channel stream
765 * libxl__stream_read_abort()
766 */
767 checkpoint_state_done(egc, stream, rc);
768 return;
769 }
770
771 stream_done(egc, stream, rc);
772 }
773
checkpoint_done(libxl__egc * egc,libxl__stream_read_state * stream,int rc)774 static void checkpoint_done(libxl__egc *egc,
775 libxl__stream_read_state *stream, int rc)
776 {
777 int ret;
778
779 assert(stream->in_checkpoint);
780
781 if (rc == 0)
782 ret = XGR_CHECKPOINT_SUCCESS;
783 else if (stream->phase == SRS_PHASE_BUFFERING)
784 ret = XGR_CHECKPOINT_FAILOVER;
785 else
786 ret = XGR_CHECKPOINT_ERROR;
787
788 stream->checkpoint_callback(egc, stream, ret);
789
790 stream->in_checkpoint = false;
791 stream->phase = SRS_PHASE_NORMAL;
792 }
793
stream_done(libxl__egc * egc,libxl__stream_read_state * stream,int rc)794 static void stream_done(libxl__egc *egc,
795 libxl__stream_read_state *stream, int rc)
796 {
797 libxl__sr_record_buf *rec, *trec;
798
799 assert(stream->running);
800 assert(!stream->in_checkpoint);
801 assert(!stream->in_checkpoint_state);
802 stream->running = false;
803
804 if (stream->incoming_record)
805 free_record(stream->incoming_record);
806
807 if (stream->emu_carefd)
808 libxl__carefd_close(stream->emu_carefd);
809
810 /* If we started a conversion helper, we took ownership of its carefd. */
811 if (stream->chs.v2_carefd)
812 libxl__carefd_close(stream->chs.v2_carefd);
813
814 /* The record queue had better be empty if the stream believes
815 * itself to have been successful. */
816 assert(LIBXL_STAILQ_EMPTY(&stream->record_queue) || stream->rc);
817
818 LIBXL_STAILQ_FOREACH_SAFE(rec, &stream->record_queue, entry, trec)
819 free_record(rec);
820
821 if (!stream->back_channel) {
822 /*
823 * 1. In stream_done(), stream->running is set to false, so
824 * the stream itself is not in use.
825 * 2. Read stream is a back channel stream, this means it is
826 * only used by primary(save side) to read records sent by
827 * secondary(restore side), so it doesn't have restore helper.
828 * 3. Back channel stream doesn't support legacy stream, so
829 * there is no conversion helper.
830 * So we don't need invoke check_all_finished here
831 */
832 check_all_finished(egc, stream, rc);
833 }
834 }
835
libxl__xc_domain_restore_done(libxl__egc * egc,void * dcs_void,int rc,int retval,int errnoval)836 void libxl__xc_domain_restore_done(libxl__egc *egc, void *dcs_void,
837 int rc, int retval, int errnoval)
838 {
839 libxl__domain_create_state *dcs = dcs_void;
840 libxl__stream_read_state *stream = &dcs->srs;
841 STATE_AO_GC(dcs->ao);
842
843 /* convenience aliases */
844 const int checkpointed_stream = dcs->restore_params.checkpointed_stream;
845
846 if (rc)
847 goto err;
848
849 if (retval) {
850 LOGEV(ERROR, errnoval, "restoring domain");
851 rc = ERROR_FAIL;
852 goto err;
853 }
854
855 err:
856 check_all_finished(egc, stream, rc);
857
858 /*
859 * This function is the callback associated with the save helper
860 * task, not the stream task. We do not know whether the stream is
861 * alive, and check_all_finished() may have torn it down around us.
862 * If the stream is not still alive, we must not continue any work.
863 */
864 if (libxl__stream_read_inuse(stream)) {
865 switch (checkpointed_stream) {
866 case LIBXL_CHECKPOINTED_STREAM_COLO:
867 if (stream->completion_callback) {
868 /*
869 * restore, just build the secondary vm, don't close
870 * the stream
871 */
872 stream->completion_callback(egc, stream, 0);
873 } else {
874 /* failover, just close the stream */
875 stream_complete(egc, stream, 0);
876 }
877 break;
878 case LIBXL_CHECKPOINTED_STREAM_REMUS:
879 /*
880 * Failover from primary. Domain state is currently at a
881 * consistent checkpoint, complete the stream, and call
882 * stream->completion_callback() to resume the guest.
883 */
884 stream_complete(egc, stream, 0);
885 break;
886 case LIBXL_CHECKPOINTED_STREAM_NONE:
887 /*
888 * Libxc has indicated that it is done with the stream.
889 * Resume reading libxl records from it.
890 */
891 stream_continue(egc, stream);
892 break;
893 }
894 }
895 }
896
conversion_done(libxl__egc * egc,libxl__conversion_helper_state * chs,int rc)897 static void conversion_done(libxl__egc *egc,
898 libxl__conversion_helper_state *chs, int rc)
899 {
900 libxl__stream_read_state *stream = CONTAINER_OF(chs, *stream, chs);
901
902 check_all_finished(egc, stream, rc);
903 }
904
check_all_finished(libxl__egc * egc,libxl__stream_read_state * stream,int rc)905 static void check_all_finished(libxl__egc *egc,
906 libxl__stream_read_state *stream, int rc)
907 {
908 STATE_AO_GC(stream->ao);
909
910 /*
911 * In the case of a failure, the _abort()'s below might cancel
912 * synchronously on top of us, or asynchronously at a later point.
913 *
914 * We must avoid the situation where all _abort() cancel
915 * synchronously and the completion_callback() gets called twice;
916 * once by the first error and once by the final stacked abort(),
917 * both of whom will find that all of the tasks have stopped.
918 *
919 * To avoid this problem, any stacked re-entry into this function is
920 * ineligible to fire the completion callback. The outermost
921 * instance will take care of completing, once the stack has
922 * unwound.
923 */
924 if (stream->sync_teardown)
925 return;
926
927 if (!stream->rc && rc) {
928 /* First reported failure. Tear everything down. */
929 stream->rc = rc;
930 stream->sync_teardown = true;
931
932 libxl__stream_read_abort(egc, stream, rc);
933 libxl__save_helper_abort(egc, &stream->shs);
934 libxl__conversion_helper_abort(egc, &stream->chs, rc);
935
936 stream->sync_teardown = false;
937 }
938
939 /* Don't fire the callback until all our parallel tasks have stopped. */
940 if (libxl__stream_read_inuse(stream) ||
941 libxl__save_helper_inuse(&stream->shs) ||
942 libxl__conversion_helper_inuse(&stream->chs))
943 return;
944
945 if (stream->completion_callback)
946 /* back channel stream doesn't have completion_callback() */
947 stream->completion_callback(egc, stream, stream->rc);
948 }
949
950 /*----- Checkpoint state handlers -----*/
951
libxl__stream_read_checkpoint_state(libxl__egc * egc,libxl__stream_read_state * stream)952 void libxl__stream_read_checkpoint_state(libxl__egc *egc,
953 libxl__stream_read_state *stream)
954 {
955 assert(stream->running);
956 assert(!stream->in_checkpoint);
957 assert(!stream->in_checkpoint_state);
958 stream->in_checkpoint_state = true;
959
960 setup_read_record(egc, stream);
961 }
962
checkpoint_state_done(libxl__egc * egc,libxl__stream_read_state * stream,int rc)963 static void checkpoint_state_done(libxl__egc *egc,
964 libxl__stream_read_state *stream, int rc)
965 {
966 assert(stream->in_checkpoint_state);
967 stream->in_checkpoint_state = false;
968 stream->checkpoint_callback(egc, stream, rc);
969 }
970
971 /*
972 * Local variables:
973 * mode: C
974 * c-basic-offset: 4
975 * indent-tabs-mode: nil
976 * End:
977 */
978