1 /*
2  * Copyright (C) 2015      Citrix Ltd.
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU Lesser General Public License as published
6  * by the Free Software Foundation; version 2.1 only. with the special
7  * exception on linking described in file LICENSE.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU Lesser General Public License for more details.
13  */
14 
15 #include "libxl_osdeps.h" /* must come before any other headers */
16 
17 #include "libxl_internal.h"
18 
19 /*
20  * Infrastructure for reading and acting on the contents of a libxl
21  * migration stream. There are a lot of moving parts here.
22  *
23  * The logic revolves around two actions; reading another record from
24  * the stream, and processing the records.  The stream_continue()
25  * function is responsible for choosing the next action to perform.
26  *
27  * The exact order of reading and processing is controlled by 'phase'.
28  * All complete records are held in the record_queue before being
29  * processed, and all records will be processed in queue order.
30  *
31  * Internal states:
32  *           running  phase       in_         record   incoming
33  *                                checkpoint  _queue   _record
34  *
35  * Undefined    undef  undef        undef       undef    undef
36  * Idle         false  undef        false       0        0
37  * Active       true   NORMAL       false       0/1      0/partial
38  * Active       true   BUFFERING    true        any      0/partial
39  * Active       true   UNBUFFERING  true        any      0
40  *
41  * While reading data from the stream, 'dc' is active and a callback
42  * is expected.  Most actions in process_record() start a callback of
43  * their own.  Those which don't return out and stream_continue() sets
44  * up the next action.
45  *
46  * PHASE_NORMAL:
47  *   This phase is used for regular migration or resume from file.
48  *   Records are read one at time and immediately processed.  (The
49  *   record queue will not contain more than a single record.)
50  *
51  * PHASE_BUFFERING:
52  *   This phase is used in checkpointed streams, when libxc signals
53  *   the presence of a checkpoint in the stream.  Records are read and
54  *   buffered until a CHECKPOINT_END record has been read.
55  *
56  * PHASE_UNBUFFERING:
57  *   Once a CHECKPOINT_END record has been read, all buffered records
58  *   are processed.
59  *
60  * Note:
61  *   Record buffers are not allocated from a GC; they are allocated
62  *   and tracked manually.  This is to avoid OOM with Remus where the
63  *   AO lives for the lifetime of the process.  Per-checkpoint AO's
64  *   might be an avenue to explore.
65  *
66  * Entry points from outside:
67  *  - libxl__stream_read_init()
68  *     - Initialises state.  Must be called once before _start()
69  *  - libxl__stream_read_start()
70  *     - Starts reading records from the stream, and acting on them.
71  *  - libxl__stream_read_start_checkpoint()
72  *     - Starts buffering records at a checkpoint.  Must be called on
73  *       a running stream.
74  *
75  * There are several chains of event:
76  *
77  * 1) Starting a stream follows:
78  *    - libxl__stream_read_start()
79  *    - stream_header_done()
80  *    - stream_continue()
81  *
82  * 2) Reading a record follows:
83  *    - stream_continue()
84  *    - record_header_done()
85  *    - record_body_done()
86  *    - stream_continue()
87  *
88  * 3) Processing a record had several chains to follow, depending on
89  *    the record in question.
90  * 3a) "Simple" record:
91  *    - process_record()
92  *    - stream_continue()
93  * 3b) LIBXC record:
94  *    - process_record()
95  *    - libxl__xc_domain_restore()
96  *    - libxl__xc_domain_restore_done()
97  *    - stream_continue()
98  * 3c) EMULATOR record:
99  *    - process_record()
100  *    - stream_write_emulator()
101  *    - stream_write_emulator_done()
102  *    - stream_continue()
103  *
104  * Depending on the contents of the stream, there are likely to be several
105  * parallel tasks being managed.  check_all_finished() is used to join all
106  * tasks in both success and error cases.
107  *
108  * Failover for remus
109  *  - We buffer all records until a CHECKPOINT_END record is received
110  *  - We will consume the buffered records when a CHECKPOINT_END record
111  *    is received
112  *  - If we find some internal error, then rc or retval is not 0 in
113  *    libxl__xc_domain_restore_done(). In this case, we don't resume the
114  *    guest
115  *  - If we need to do failover from primary, then rc and retval are both
116  *    0 in libxl__xc_domain_restore_done(). In this case, the buffered
117  *    state will be dropped, because we haven't received a CHECKPOINT_END
118  *    record, and therefore the buffered state is inconsistent. In
119  *    libxl__xc_domain_restore_done(), we just complete the stream and
120  *    stream->completion_callback() will be called to resume the guest
121  *
122  * For back channel stream:
123  * - libxl__stream_read_start()
124  *    - Set up the stream to running state
125  *
126  * - libxl__stream_read_continue()
127  *     - Set up reading the next record from a started stream.
128  *       Add some codes to process_record() to handle the record.
129  *       Then call stream->checkpoint_callback() to return.
130  */
131 
132 /* Success/error/cleanup handling. */
133 static void stream_complete(libxl__egc *egc,
134                             libxl__stream_read_state *stream, int rc);
135 static void checkpoint_done(libxl__egc *egc,
136                             libxl__stream_read_state *stream, int rc);
137 static void stream_done(libxl__egc *egc,
138                         libxl__stream_read_state *stream, int rc);
139 static void conversion_done(libxl__egc *egc,
140                             libxl__conversion_helper_state *chs, int rc);
141 static void check_all_finished(libxl__egc *egc,
142                                libxl__stream_read_state *stream, int rc);
143 
144 /* Event chain for first iteration, from _start(). */
145 static void stream_header_done(libxl__egc *egc,
146                                libxl__datacopier_state *dc,
147                                int rc, int onwrite, int errnoval);
148 static void stream_continue(libxl__egc *egc,
149                             libxl__stream_read_state *stream);
150 static void setup_read_record(libxl__egc *egc,
151                               libxl__stream_read_state *stream);
152 static void record_header_done(libxl__egc *egc,
153                                libxl__datacopier_state *dc,
154                                int rc, int onwrite, int errnoval);
155 static void record_body_done(libxl__egc *egc,
156                              libxl__datacopier_state *dc,
157                              int rc, int onwrite, int errnoval);
158 static bool process_record(libxl__egc *egc,
159                            libxl__stream_read_state *stream);
160 
161 /* Event chain for processing an emulator blob. */
162 static void write_emulator_blob(libxl__egc *egc,
163                                 libxl__stream_read_state *stream,
164                                 libxl__sr_record_buf *rec);
165 static void write_emulator_done(libxl__egc *egc,
166                                 libxl__datacopier_state *dc,
167                                 int rc, int onwrite, int errnoval);
168 
169 /* Handlers for checkpoint state mini-loop */
170 static void checkpoint_state_done(libxl__egc *egc,
171                                   libxl__stream_read_state *stream, int rc);
172 
173 /*----- Helpers -----*/
174 
175 /* Helper to set up reading some data from the stream. */
setup_read(libxl__stream_read_state * stream,const char * what,void * ptr,size_t nr_bytes,libxl__datacopier_callback cb)176 static int setup_read(libxl__stream_read_state *stream,
177                       const char *what, void *ptr, size_t nr_bytes,
178                       libxl__datacopier_callback cb)
179 {
180     libxl__datacopier_state *dc = &stream->dc;
181 
182     dc->readwhat      = what;
183     dc->readbuf       = ptr;
184     dc->bytes_to_read = nr_bytes;
185     dc->used          = 0;
186     dc->callback      = cb;
187 
188     return libxl__datacopier_start(dc);
189 }
190 
free_record(libxl__sr_record_buf * rec)191 static void free_record(libxl__sr_record_buf *rec)
192 {
193     if (rec) {
194         free(rec->body);
195         free(rec);
196     }
197 }
198 
199 /*----- Entrypoints -----*/
200 
libxl__stream_read_init(libxl__stream_read_state * stream)201 void libxl__stream_read_init(libxl__stream_read_state *stream)
202 {
203     assert(stream->ao);
204 
205     stream->shs.ao = stream->ao;
206     libxl__save_helper_init(&stream->shs);
207 
208     stream->chs.ao = stream->ao;
209     libxl__conversion_helper_init(&stream->chs);
210 
211     stream->rc = 0;
212     stream->running = false;
213     stream->in_checkpoint = false;
214     stream->sync_teardown = false;
215     FILLZERO(stream->dc);
216     FILLZERO(stream->hdr);
217     LIBXL_STAILQ_INIT(&stream->record_queue);
218     stream->phase = SRS_PHASE_NORMAL;
219     stream->recursion_guard = false;
220     stream->incoming_record = NULL;
221     FILLZERO(stream->emu_dc);
222     stream->emu_carefd = NULL;
223 }
224 
libxl__stream_read_start(libxl__egc * egc,libxl__stream_read_state * stream)225 void libxl__stream_read_start(libxl__egc *egc,
226                               libxl__stream_read_state *stream)
227 {
228     libxl__datacopier_state *dc = &stream->dc;
229     STATE_AO_GC(stream->ao);
230     int rc = 0;
231 
232     libxl__stream_read_init(stream);
233 
234     stream->running = true;
235     stream->phase   = SRS_PHASE_NORMAL;
236 
237     if (stream->legacy) {
238         /*
239          * Convert the legacy stream.
240          *
241          * This results in a fork()/exec() of conversion helper script.  It is
242          * passed the exiting stream->fd as an input, and returns the
243          * transformed stream via a new pipe.  The fd of this new pipe then
244          * replaces stream->fd, to make the rest of the stream read code
245          * agnostic to whether legacy conversion is happening or not.
246          */
247         libxl__conversion_helper_state *chs = &stream->chs;
248 
249         chs->legacy_fd = stream->fd;
250         chs->hvm =
251             (stream->dcs->guest_config->b_info.type == LIBXL_DOMAIN_TYPE_HVM);
252         chs->completion_callback = conversion_done;
253 
254         rc = libxl__convert_legacy_stream(egc, &stream->chs);
255 
256         if (rc) {
257             LOG(ERROR, "Failed to start the legacy stream conversion helper");
258             goto err;
259         }
260 
261         /* There should be no interaction of COLO backchannels and legacy
262          * stream conversion. */
263         assert(!stream->back_channel);
264 
265         /* Confirm *dc is still zeroed out, while we shuffle stream->fd. */
266         assert(dc->ao == NULL);
267         assert(stream->chs.v2_carefd);
268         stream->fd = libxl__carefd_fd(stream->chs.v2_carefd);
269         stream->dcs->libxc_fd = stream->fd;
270     }
271     /* stream->fd is now a v2 stream. */
272 
273     dc->ao       = stream->ao;
274     dc->copywhat = "restore v2 stream";
275     dc->readfd   = stream->fd;
276     dc->writefd  = -1;
277 
278     if (stream->back_channel)
279         return;
280 
281     /* Start reading the stream header. */
282     rc = setup_read(stream, "stream header",
283                     &stream->hdr, sizeof(stream->hdr),
284                     stream_header_done);
285     if (rc)
286         goto err;
287 
288     assert(!rc);
289     return;
290 
291  err:
292     assert(rc);
293     stream_complete(egc, stream, rc);
294 }
295 
libxl__stream_read_start_checkpoint(libxl__egc * egc,libxl__stream_read_state * stream)296 void libxl__stream_read_start_checkpoint(libxl__egc *egc,
297                                          libxl__stream_read_state *stream)
298 {
299     assert(stream->running);
300     assert(!stream->in_checkpoint);
301 
302     stream->in_checkpoint = true;
303     stream->phase = SRS_PHASE_BUFFERING;
304 
305     /*
306      * Libxc has handed control of the fd to us.  Start reading some
307      * libxl records out of it.
308      */
309     stream_continue(egc, stream);
310 }
311 
libxl__stream_read_abort(libxl__egc * egc,libxl__stream_read_state * stream,int rc)312 void libxl__stream_read_abort(libxl__egc *egc,
313                               libxl__stream_read_state *stream, int rc)
314 {
315     assert(rc);
316 
317     if (stream->running)
318         stream_complete(egc, stream, rc);
319 }
320 
321 /*----- Event logic -----*/
322 
stream_header_done(libxl__egc * egc,libxl__datacopier_state * dc,int rc,int onwrite,int errnoval)323 static void stream_header_done(libxl__egc *egc,
324                                libxl__datacopier_state *dc,
325                                int rc, int onwrite, int errnoval)
326 {
327     libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
328     libxl__sr_hdr *hdr = &stream->hdr;
329     STATE_AO_GC(dc->ao);
330 
331     if (rc)
332         goto err;
333 
334     hdr->ident   = be64toh(hdr->ident);
335     hdr->version = be32toh(hdr->version);
336     hdr->options = be32toh(hdr->options);
337 
338     if (hdr->ident != RESTORE_STREAM_IDENT) {
339         rc = ERROR_FAIL;
340         LOG(ERROR,
341             "Invalid ident: expected 0x%016"PRIx64", got 0x%016"PRIx64,
342             RESTORE_STREAM_IDENT, hdr->ident);
343         goto err;
344     }
345     if (hdr->version != RESTORE_STREAM_VERSION) {
346         rc = ERROR_FAIL;
347         LOG(ERROR, "Unexpected Version: expected %"PRIu32", got %"PRIu32,
348             RESTORE_STREAM_VERSION, hdr->version);
349         goto err;
350     }
351     if (hdr->options & RESTORE_OPT_BIG_ENDIAN) {
352         rc = ERROR_FAIL;
353         LOG(ERROR, "Unable to handle big endian streams");
354         goto err;
355     }
356 
357     LOG(DEBUG, "Stream v%"PRIu32"%s", hdr->version,
358         hdr->options & RESTORE_OPT_LEGACY ? " (from legacy)" : "");
359 
360     stream_continue(egc, stream);
361     return;
362 
363  err:
364     assert(rc);
365     stream_complete(egc, stream, rc);
366 }
367 
stream_continue(libxl__egc * egc,libxl__stream_read_state * stream)368 static void stream_continue(libxl__egc *egc,
369                             libxl__stream_read_state *stream)
370 {
371     STATE_AO_GC(stream->ao);
372 
373     /*
374      * Must not mutually recurse with process_record().
375      *
376      * For records whose processing function is synchronous
377      * (e.g. TOOLSTACK), process_record() does not start another async
378      * operation, and a further operation should be started.
379      *
380      * A naive solution, which would function in general, would be for
381      * process_record() to call stream_continue().  However, this
382      * would allow the content of the stream to cause mutual
383      * recursion, and possibly for us to fall off our stack.
384      *
385      * Instead, process_record() indicates with its return value
386      * whether a further operation needs to start, and the
387      * recursion_guard is in place to catch any code paths which get
388      * this wrong.
389      */
390     assert(stream->recursion_guard == false);
391     stream->recursion_guard = true;
392 
393     switch (stream->phase) {
394     case SRS_PHASE_NORMAL:
395         /*
396          * Normal phase (regular migration or restore from file):
397          *
398          * logically:
399          *   do { read_record(); process_record(); } while ( not END );
400          *
401          * Alternate between reading a record from the stream, and
402          * processing the record.  There should never be two records
403          * in the queue.
404          */
405         if (LIBXL_STAILQ_EMPTY(&stream->record_queue))
406             setup_read_record(egc, stream);
407         else {
408             if (process_record(egc, stream))
409                 setup_read_record(egc, stream);
410 
411             /*
412              * process_record() had better have consumed the one and
413              * only record in the queue.
414              */
415             assert(LIBXL_STAILQ_EMPTY(&stream->record_queue));
416         }
417         break;
418 
419     case SRS_PHASE_BUFFERING: {
420         /*
421          * Buffering phase (checkpointed streams only):
422          *
423          * logically:
424          *   do { read_record(); } while ( not CHECKPOINT_END );
425          *
426          * Read and buffer all records from the stream until a
427          * CHECKPOINT_END record is encountered.  We need to peek at
428          * the tail to spot the CHECKPOINT_END record, and switch to
429          * the unbuffering phase.
430          */
431         libxl__sr_record_buf *rec = LIBXL_STAILQ_LAST(
432             &stream->record_queue, libxl__sr_record_buf, entry);
433 
434         assert(stream->in_checkpoint);
435 
436         if (!rec || (rec->hdr.type != REC_TYPE_CHECKPOINT_END)) {
437             setup_read_record(egc, stream);
438             break;
439         }
440 
441         /*
442          * There are now some number of buffered records, with a
443          * CHECKPOINT_END at the end. Start processing them all.
444          */
445         stream->phase = SRS_PHASE_UNBUFFERING;
446     }
447         /* FALLTHROUGH */
448     case SRS_PHASE_UNBUFFERING:
449         /*
450          * Unbuffering phase (checkpointed streams only):
451          *
452          * logically:
453          *   do { process_record(); } while ( not CHECKPOINT_END );
454          *
455          * Process all records collected during the buffering phase.
456          */
457         assert(stream->in_checkpoint);
458 
459         while (process_record(egc, stream))
460             ; /*
461                * Nothing! process_record() helpfully tells us if no specific
462                * futher actions have been set up, in which case we want to go
463                * ahead and process the next record.
464                */
465         break;
466 
467     default:
468         abort();
469     }
470 
471     assert(stream->recursion_guard == true);
472     stream->recursion_guard = false;
473 }
474 
setup_read_record(libxl__egc * egc,libxl__stream_read_state * stream)475 static void setup_read_record(libxl__egc *egc,
476                               libxl__stream_read_state *stream)
477 {
478     libxl__sr_record_buf *rec = NULL;
479     STATE_AO_GC(stream->ao);
480     int rc;
481 
482     assert(stream->incoming_record == NULL);
483     stream->incoming_record = rec = libxl__zalloc(NOGC, sizeof(*rec));
484 
485     rc = setup_read(stream, "record header",
486                     &rec->hdr, sizeof(rec->hdr),
487                     record_header_done);
488     if (rc)
489         goto err;
490     return;
491 
492  err:
493     assert(rc);
494     stream_complete(egc, stream, rc);
495 }
496 
record_header_done(libxl__egc * egc,libxl__datacopier_state * dc,int rc,int onwrite,int errnoval)497 static void record_header_done(libxl__egc *egc,
498                                libxl__datacopier_state *dc,
499                                int rc, int onwrite, int errnoval)
500 {
501     libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
502     libxl__sr_record_buf *rec = stream->incoming_record;
503     STATE_AO_GC(dc->ao);
504 
505     if (rc)
506         goto err;
507 
508     /* No body? All done. */
509     if (rec->hdr.length == 0) {
510         record_body_done(egc, dc, 0, 0, 0);
511         return;
512     }
513 
514     size_t bytes_to_read = ROUNDUP(rec->hdr.length, REC_ALIGN_ORDER);
515     rec->body = libxl__malloc(NOGC, bytes_to_read);
516 
517     rc = setup_read(stream, "record body",
518                     rec->body, bytes_to_read,
519                     record_body_done);
520     if (rc)
521         goto err;
522     return;
523 
524  err:
525     assert(rc);
526     stream_complete(egc, stream, rc);
527 }
528 
record_body_done(libxl__egc * egc,libxl__datacopier_state * dc,int rc,int onwrite,int errnoval)529 static void record_body_done(libxl__egc *egc,
530                              libxl__datacopier_state *dc,
531                              int rc, int onwrite, int errnoval)
532 {
533     libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, dc);
534     libxl__sr_record_buf *rec = stream->incoming_record;
535     STATE_AO_GC(dc->ao);
536 
537     if (rc)
538         goto err;
539 
540     LIBXL_STAILQ_INSERT_TAIL(&stream->record_queue, rec, entry);
541     stream->incoming_record = NULL;
542 
543     stream_continue(egc, stream);
544     return;
545 
546  err:
547     assert(rc);
548     stream_complete(egc, stream, rc);
549 }
550 
551 /*
552  * Returns a boolean indicating whether a further action should be set
553  * up by the caller.  This is needed to prevent mutual recursion with
554  * stream_continue().
555  *
556  * It is a bug for this function to ever call stream_continue() or
557  * setup_read_record().
558  */
process_record(libxl__egc * egc,libxl__stream_read_state * stream)559 static bool process_record(libxl__egc *egc,
560                            libxl__stream_read_state *stream)
561 {
562     STATE_AO_GC(stream->ao);
563     libxl__domain_create_state *dcs = stream->dcs;
564     libxl__sr_record_buf *rec;
565     libxl_sr_checkpoint_state *srcs;
566     bool further_action_needed = false;
567     int rc = 0;
568 
569     /* Pop a record from the head of the queue. */
570     assert(!LIBXL_STAILQ_EMPTY(&stream->record_queue));
571     rec = LIBXL_STAILQ_FIRST(&stream->record_queue);
572     LIBXL_STAILQ_REMOVE_HEAD(&stream->record_queue, entry);
573 
574     LOG(DEBUG, "Record: %u, length %u", rec->hdr.type, rec->hdr.length);
575 
576     switch (rec->hdr.type) {
577 
578     case REC_TYPE_END:
579         stream_complete(egc, stream, 0);
580         break;
581 
582     case REC_TYPE_LIBXC_CONTEXT:
583         libxl__xc_domain_restore(egc, dcs, &stream->shs, 0, 0);
584         break;
585 
586     case REC_TYPE_EMULATOR_XENSTORE_DATA:
587         if (dcs->guest_config->b_info.type != LIBXL_DOMAIN_TYPE_HVM) {
588             rc = ERROR_FAIL;
589             LOG(ERROR,
590                 "Received a xenstore emulator record when none was expected");
591             goto err;
592         }
593 
594         if (rec->hdr.length < sizeof(libxl__sr_emulator_hdr)) {
595             rc = ERROR_FAIL;
596             LOG(ERROR,
597                 "Emulator xenstore data record too short to contain header");
598             goto err;
599         }
600 
601         rc = libxl__restore_emulator_xenstore_data(dcs,
602             rec->body + sizeof(libxl__sr_emulator_hdr),
603             rec->hdr.length - sizeof(libxl__sr_emulator_hdr));
604         if (rc)
605             goto err;
606 
607         /*
608          * libxl__restore_emulator_xenstore_data() is a synchronous function.
609          * Request that our caller queues another action for us.
610          */
611         further_action_needed = true;
612         break;
613 
614     case REC_TYPE_EMULATOR_CONTEXT:
615         if (dcs->guest_config->b_info.type != LIBXL_DOMAIN_TYPE_HVM) {
616             rc = ERROR_FAIL;
617             LOG(ERROR,
618                 "Received an emulator context record when none was expected");
619             goto err;
620         }
621 
622         write_emulator_blob(egc, stream, rec);
623         break;
624 
625     case REC_TYPE_CHECKPOINT_END:
626         if (!stream->in_checkpoint) {
627             LOG(ERROR, "Unexpected CHECKPOINT_END record in stream");
628             rc = ERROR_FAIL;
629             goto err;
630         }
631         checkpoint_done(egc, stream, 0);
632         break;
633 
634     case REC_TYPE_CHECKPOINT_STATE:
635         if (!stream->in_checkpoint_state) {
636             LOG(ERROR, "Unexpected CHECKPOINT_STATE record in stream");
637             rc = ERROR_FAIL;
638             goto err;
639         }
640 
641         srcs = rec->body;
642         checkpoint_state_done(egc, stream, srcs->id);
643         break;
644 
645     default:
646         LOG(ERROR, "Unrecognised record 0x%08x", rec->hdr.type);
647         rc = ERROR_FAIL;
648         goto err;
649     }
650 
651     assert(!rc);
652     free_record(rec);
653     return further_action_needed;
654 
655  err:
656     assert(rc);
657     free_record(rec);
658     stream_complete(egc, stream, rc);
659     return false;
660 }
661 
write_emulator_blob(libxl__egc * egc,libxl__stream_read_state * stream,libxl__sr_record_buf * rec)662 static void write_emulator_blob(libxl__egc *egc,
663                                 libxl__stream_read_state *stream,
664                                 libxl__sr_record_buf *rec)
665 {
666     libxl__domain_create_state *dcs = stream->dcs;
667     libxl__datacopier_state *dc = &stream->emu_dc;
668     libxl__sr_emulator_hdr *emu_hdr;
669     STATE_AO_GC(stream->ao);
670     char path[256];
671     int rc = 0, writefd;
672 
673     if (rec->hdr.length < sizeof(*emu_hdr)) {
674         rc = ERROR_FAIL;
675         LOG(ERROR, "Emulator record too short to contain header");
676         goto err;
677     }
678     emu_hdr = rec->body;
679 
680     sprintf(path, LIBXL_DEVICE_MODEL_RESTORE_FILE".%u", dcs->guest_domid);
681 
682     assert(stream->emu_carefd == NULL);
683     libxl__carefd_begin();
684     writefd = open(path, O_WRONLY | O_CREAT | O_TRUNC, 0600);
685     stream->emu_carefd = libxl__carefd_opened(CTX, writefd);
686 
687     if (writefd == -1) {
688         rc = ERROR_FAIL;
689         LOGE(ERROR, "unable to open %s", path);
690         goto err;
691     }
692 
693     FILLZERO(*dc);
694     dc->ao         = stream->ao;
695     dc->writewhat  = "qemu save file";
696     dc->copywhat   = "restore v2 stream";
697     dc->writefd    = writefd;
698     dc->readfd     = -1;
699     dc->maxsz      = -1;
700     dc->callback   = write_emulator_done;
701 
702     rc = libxl__datacopier_start(dc);
703     if (rc)
704         goto err;
705 
706     libxl__datacopier_prefixdata(egc, dc,
707                                  rec->body + sizeof(*emu_hdr),
708                                  rec->hdr.length - sizeof(*emu_hdr));
709     return;
710 
711  err:
712     assert(rc);
713     stream_complete(egc, stream, rc);
714 }
715 
write_emulator_done(libxl__egc * egc,libxl__datacopier_state * dc,int rc,int onwrite,int errnoval)716 static void write_emulator_done(libxl__egc *egc,
717                                 libxl__datacopier_state *dc,
718                                 int rc, int onwrite, int errnoval)
719 {
720     libxl__stream_read_state *stream = CONTAINER_OF(dc, *stream, emu_dc);
721     STATE_AO_GC(dc->ao);
722 
723     libxl__carefd_close(stream->emu_carefd);
724     stream->emu_carefd = NULL;
725 
726     if (rc)
727         goto err;
728 
729     stream_continue(egc, stream);
730     return;
731 
732  err:
733     assert(rc);
734     stream_complete(egc, stream, rc);
735 }
736 
737 /*----- Success/error/cleanup handling. -----*/
738 
stream_complete(libxl__egc * egc,libxl__stream_read_state * stream,int rc)739 static void stream_complete(libxl__egc *egc,
740                             libxl__stream_read_state *stream, int rc)
741 {
742     assert(stream->running);
743 
744     if (stream->in_checkpoint) {
745         assert(rc);
746 
747         /*
748          * If an error is encountered while in a checkpoint, pass it
749          * back to libxc.  The failure will come back around to us via
750          * libxl__xc_domain_restore_done()
751          */
752         checkpoint_done(egc, stream, rc);
753         return;
754     }
755 
756     if (stream->in_checkpoint_state) {
757         assert(rc);
758 
759         /*
760          * If an error is encountered while in a checkpoint, pass it
761          * back to libxc.  The failure will come back around to us via
762          * 1. normal stream
763          *    libxl__xc_domain_restore_done()
764          * 2. back_channel stream
765          *    libxl__stream_read_abort()
766          */
767         checkpoint_state_done(egc, stream, rc);
768         return;
769     }
770 
771     stream_done(egc, stream, rc);
772 }
773 
checkpoint_done(libxl__egc * egc,libxl__stream_read_state * stream,int rc)774 static void checkpoint_done(libxl__egc *egc,
775                             libxl__stream_read_state *stream, int rc)
776 {
777     int ret;
778 
779     assert(stream->in_checkpoint);
780 
781     if (rc == 0)
782         ret = XGR_CHECKPOINT_SUCCESS;
783     else if (stream->phase == SRS_PHASE_BUFFERING)
784         ret = XGR_CHECKPOINT_FAILOVER;
785     else
786         ret = XGR_CHECKPOINT_ERROR;
787 
788     stream->checkpoint_callback(egc, stream, ret);
789 
790     stream->in_checkpoint = false;
791     stream->phase = SRS_PHASE_NORMAL;
792 }
793 
stream_done(libxl__egc * egc,libxl__stream_read_state * stream,int rc)794 static void stream_done(libxl__egc *egc,
795                         libxl__stream_read_state *stream, int rc)
796 {
797     libxl__sr_record_buf *rec, *trec;
798 
799     assert(stream->running);
800     assert(!stream->in_checkpoint);
801     assert(!stream->in_checkpoint_state);
802     stream->running = false;
803 
804     if (stream->incoming_record)
805         free_record(stream->incoming_record);
806 
807     if (stream->emu_carefd)
808         libxl__carefd_close(stream->emu_carefd);
809 
810     /* If we started a conversion helper, we took ownership of its carefd. */
811     if (stream->chs.v2_carefd)
812         libxl__carefd_close(stream->chs.v2_carefd);
813 
814     /* The record queue had better be empty if the stream believes
815      * itself to have been successful. */
816     assert(LIBXL_STAILQ_EMPTY(&stream->record_queue) || stream->rc);
817 
818     LIBXL_STAILQ_FOREACH_SAFE(rec, &stream->record_queue, entry, trec)
819         free_record(rec);
820 
821     if (!stream->back_channel) {
822         /*
823          * 1. In stream_done(), stream->running is set to false, so
824          *    the stream itself is not in use.
825          * 2. Read stream is a back channel stream, this means it is
826          *    only used by primary(save side) to read records sent by
827          *    secondary(restore side), so it doesn't have restore helper.
828          * 3. Back channel stream doesn't support legacy stream, so
829          *    there is no conversion helper.
830          * So we don't need invoke check_all_finished here
831          */
832         check_all_finished(egc, stream, rc);
833     }
834 }
835 
libxl__xc_domain_restore_done(libxl__egc * egc,void * dcs_void,int rc,int retval,int errnoval)836 void libxl__xc_domain_restore_done(libxl__egc *egc, void *dcs_void,
837                                    int rc, int retval, int errnoval)
838 {
839     libxl__domain_create_state *dcs = dcs_void;
840     libxl__stream_read_state *stream = &dcs->srs;
841     STATE_AO_GC(dcs->ao);
842 
843     /* convenience aliases */
844     const int checkpointed_stream = dcs->restore_params.checkpointed_stream;
845 
846     if (rc)
847         goto err;
848 
849     if (retval) {
850         LOGEV(ERROR, errnoval, "restoring domain");
851         rc = ERROR_FAIL;
852         goto err;
853     }
854 
855  err:
856     check_all_finished(egc, stream, rc);
857 
858     /*
859      * This function is the callback associated with the save helper
860      * task, not the stream task.  We do not know whether the stream is
861      * alive, and check_all_finished() may have torn it down around us.
862      * If the stream is not still alive, we must not continue any work.
863      */
864     if (libxl__stream_read_inuse(stream)) {
865         switch (checkpointed_stream) {
866         case LIBXL_CHECKPOINTED_STREAM_COLO:
867             if (stream->completion_callback) {
868                 /*
869                  * restore, just build the secondary vm, don't close
870                  * the stream
871                  */
872                 stream->completion_callback(egc, stream, 0);
873             } else {
874                 /* failover, just close the stream */
875                 stream_complete(egc, stream, 0);
876             }
877             break;
878         case LIBXL_CHECKPOINTED_STREAM_REMUS:
879             /*
880              * Failover from primary. Domain state is currently at a
881              * consistent checkpoint, complete the stream, and call
882              * stream->completion_callback() to resume the guest.
883              */
884             stream_complete(egc, stream, 0);
885             break;
886         case LIBXL_CHECKPOINTED_STREAM_NONE:
887             /*
888              * Libxc has indicated that it is done with the stream.
889              * Resume reading libxl records from it.
890              */
891             stream_continue(egc, stream);
892             break;
893         }
894     }
895 }
896 
conversion_done(libxl__egc * egc,libxl__conversion_helper_state * chs,int rc)897 static void conversion_done(libxl__egc *egc,
898                             libxl__conversion_helper_state *chs, int rc)
899 {
900     libxl__stream_read_state *stream = CONTAINER_OF(chs, *stream, chs);
901 
902     check_all_finished(egc, stream, rc);
903 }
904 
check_all_finished(libxl__egc * egc,libxl__stream_read_state * stream,int rc)905 static void check_all_finished(libxl__egc *egc,
906                                libxl__stream_read_state *stream, int rc)
907 {
908     STATE_AO_GC(stream->ao);
909 
910     /*
911      * In the case of a failure, the _abort()'s below might cancel
912      * synchronously on top of us, or asynchronously at a later point.
913      *
914      * We must avoid the situation where all _abort() cancel
915      * synchronously and the completion_callback() gets called twice;
916      * once by the first error and once by the final stacked abort(),
917      * both of whom will find that all of the tasks have stopped.
918      *
919      * To avoid this problem, any stacked re-entry into this function is
920      * ineligible to fire the completion callback.  The outermost
921      * instance will take care of completing, once the stack has
922      * unwound.
923      */
924     if (stream->sync_teardown)
925         return;
926 
927     if (!stream->rc && rc) {
928         /* First reported failure. Tear everything down. */
929         stream->rc = rc;
930         stream->sync_teardown = true;
931 
932         libxl__stream_read_abort(egc, stream, rc);
933         libxl__save_helper_abort(egc, &stream->shs);
934         libxl__conversion_helper_abort(egc, &stream->chs, rc);
935 
936         stream->sync_teardown = false;
937     }
938 
939     /* Don't fire the callback until all our parallel tasks have stopped. */
940     if (libxl__stream_read_inuse(stream) ||
941         libxl__save_helper_inuse(&stream->shs) ||
942         libxl__conversion_helper_inuse(&stream->chs))
943         return;
944 
945     if (stream->completion_callback)
946         /* back channel stream doesn't have completion_callback() */
947         stream->completion_callback(egc, stream, stream->rc);
948 }
949 
950 /*----- Checkpoint state handlers -----*/
951 
libxl__stream_read_checkpoint_state(libxl__egc * egc,libxl__stream_read_state * stream)952 void libxl__stream_read_checkpoint_state(libxl__egc *egc,
953                                          libxl__stream_read_state *stream)
954 {
955     assert(stream->running);
956     assert(!stream->in_checkpoint);
957     assert(!stream->in_checkpoint_state);
958     stream->in_checkpoint_state = true;
959 
960     setup_read_record(egc, stream);
961 }
962 
checkpoint_state_done(libxl__egc * egc,libxl__stream_read_state * stream,int rc)963 static void checkpoint_state_done(libxl__egc *egc,
964                                   libxl__stream_read_state *stream, int rc)
965 {
966     assert(stream->in_checkpoint_state);
967     stream->in_checkpoint_state = false;
968     stream->checkpoint_callback(egc, stream, rc);
969 }
970 
971 /*
972  * Local variables:
973  * mode: C
974  * c-basic-offset: 4
975  * indent-tabs-mode: nil
976  * End:
977  */
978