1 /*
2 * Copyright (C) 2015 Citrix Ltd.
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU Lesser General Public License as published
6 * by the Free Software Foundation; version 2.1 only. with the special
7 * exception on linking described in file LICENSE.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU Lesser General Public License for more details.
13 */
14
15 #include "libxl_osdeps.h" /* must come before any other headers */
16
17 #include "libxl_internal.h"
18
19 /*
20 * Infrastructure for writing a domain to a libxl migration v2 stream.
21 *
22 * Entry points from outside:
23 * - libxl__stream_write_start()
24 * - Start writing a stream from the start.
25 * - libxl__stream_write_start_checkpoint()
26 * - Write the records which form a checkpoint into a stream.
27 *
28 * In normal operation, there are two tasks running at once; this
29 * stream processing, and the libxl-save-helper. check_all_finished()
30 * is used to join all the tasks in both success and error cases.
31 *
32 * Nomenclature for event callbacks:
33 * - $FOO_done(): Completion callback for $FOO
34 * - write_$FOO(): Set up the datacopier to write a $FOO
35 * - $BAR_header(): A $BAR record header only
36 * - $BAR_record(): A complete $BAR record with header and content
37 *
38 * The main loop for a plain VM writes:
39 * - Stream header
40 * - Libxc record
41 * - (optional) Emulator xenstore record
42 * - if (hvm)
43 * - Emulator context record
44 * - End record
45 *
46 * For checkpointed stream, there is a second loop which is triggered by a
47 * save-helper checkpoint callback. It writes:
48 * - (optional) Emulator xenstore record
49 * - if (hvm)
50 * - Emulator context record
51 * - Checkpoint end record
52 *
53 * For back channel stream:
54 * - libxl__stream_write_start()
55 * - Set up the stream to running state
56 *
57 * - Use libxl__stream_write_checkpoint_state to write the record. When the
58 * record is written out, call stream->checkpoint_callback() to return.
59 */
60
61 /* Success/error/cleanup handling. */
62 static void stream_success(libxl__egc *egc,
63 libxl__stream_write_state *stream);
64 static void stream_complete(libxl__egc *egc,
65 libxl__stream_write_state *stream, int rc);
66 static void stream_done(libxl__egc *egc,
67 libxl__stream_write_state *stream, int rc);
68 static void checkpoint_done(libxl__egc *egc,
69 libxl__stream_write_state *stream,
70 int rc);
71 static void check_all_finished(libxl__egc *egc,
72 libxl__stream_write_state *stream, int rc);
73
74 /* Event chain for a plain VM. */
75 static void stream_header_done(libxl__egc *egc,
76 libxl__datacopier_state *dc,
77 int rc, int onwrite, int errnoval);
78 static void libxc_header_done(libxl__egc *egc,
79 libxl__stream_write_state *stream);
80 /* libxl__xc_domain_save_done() lives here, event-order wise. */
81 static void write_emulator_xenstore_record(libxl__egc *egc,
82 libxl__stream_write_state *stream);
83 static void emulator_xenstore_record_done(libxl__egc *egc,
84 libxl__stream_write_state *stream);
85 static void write_emulator_context_record(libxl__egc *egc,
86 libxl__stream_write_state *stream);
87 static void emulator_context_read_done(libxl__egc *egc,
88 libxl__datacopier_state *dc,
89 int rc, int onwrite, int errnoval);
90 static void emulator_context_record_done(libxl__egc *egc,
91 libxl__stream_write_state *stream);
92 static void write_end_record(libxl__egc *egc,
93 libxl__stream_write_state *stream);
94
95 /* Event chain unique to checkpointed streams. */
96 static void write_checkpoint_end_record(libxl__egc *egc,
97 libxl__stream_write_state *stream);
98 static void checkpoint_end_record_done(libxl__egc *egc,
99 libxl__stream_write_state *stream);
100
101 /* checkpoint state */
102 static void write_checkpoint_state_done(libxl__egc *egc,
103 libxl__stream_write_state *stream);
104 static void checkpoint_state_done(libxl__egc *egc,
105 libxl__stream_write_state *stream, int rc);
106
107 /*----- Helpers -----*/
108
109 static void write_done(libxl__egc *egc,
110 libxl__datacopier_state *dc,
111 int rc, int onwrite, int errnoval);
112
113 /* Generic helper to set up writing some data to the stream. */
setup_generic_write(libxl__egc * egc,libxl__stream_write_state * stream,const char * what,libxl__sr_rec_hdr * hdr,libxl__sr_emulator_hdr * emu_hdr,void * body,sws_record_done_cb cb)114 static void setup_generic_write(libxl__egc *egc,
115 libxl__stream_write_state *stream,
116 const char *what,
117 libxl__sr_rec_hdr *hdr,
118 libxl__sr_emulator_hdr *emu_hdr,
119 void *body,
120 sws_record_done_cb cb)
121 {
122 static const uint8_t zero_padding[1U << REC_ALIGN_ORDER] = { 0 };
123
124 libxl__datacopier_state *dc = &stream->dc;
125 int rc;
126
127 assert(stream->record_done_callback == NULL);
128
129 dc->writewhat = what;
130 dc->used = 0;
131 dc->callback = write_done;
132 rc = libxl__datacopier_start(dc);
133
134 if (rc) {
135 stream_complete(egc, stream, rc);
136 return;
137 }
138
139 size_t padsz = ROUNDUP(hdr->length, REC_ALIGN_ORDER) - hdr->length;
140 uint32_t length = hdr->length;
141
142 /* Insert header */
143 libxl__datacopier_prefixdata(egc, dc, hdr, sizeof(*hdr));
144
145 /* Optional emulator sub-header */
146 if (emu_hdr) {
147 assert(length >= sizeof(*emu_hdr));
148 libxl__datacopier_prefixdata(egc, dc, emu_hdr, sizeof(*emu_hdr));
149 length -= sizeof(*emu_hdr);
150 }
151
152 /* Optional body */
153 if (body)
154 libxl__datacopier_prefixdata(egc, dc, body, length);
155
156 /* Any required padding */
157 if (padsz > 0)
158 libxl__datacopier_prefixdata(egc, dc,
159 zero_padding, padsz);
160 stream->record_done_callback = cb;
161 }
162
163 /* Helper to set up writing a regular record to the stream. */
setup_write(libxl__egc * egc,libxl__stream_write_state * stream,const char * what,libxl__sr_rec_hdr * hdr,void * body,sws_record_done_cb cb)164 static void setup_write(libxl__egc *egc,
165 libxl__stream_write_state *stream,
166 const char *what,
167 libxl__sr_rec_hdr *hdr,
168 void *body,
169 sws_record_done_cb cb)
170 {
171 setup_generic_write(egc, stream, what, hdr, NULL, body, cb);
172 }
173
174 /* Helper to set up writing a record with an emulator prefix to the stream. */
setup_emulator_write(libxl__egc * egc,libxl__stream_write_state * stream,const char * what,libxl__sr_rec_hdr * hdr,libxl__sr_emulator_hdr * emu_hdr,void * body,sws_record_done_cb cb)175 static void setup_emulator_write(libxl__egc *egc,
176 libxl__stream_write_state *stream,
177 const char *what,
178 libxl__sr_rec_hdr *hdr,
179 libxl__sr_emulator_hdr *emu_hdr,
180 void *body,
181 sws_record_done_cb cb)
182 {
183 assert(stream->emu_sub_hdr.id != EMULATOR_UNKNOWN);
184 setup_generic_write(egc, stream, what, hdr, emu_hdr, body, cb);
185 }
186
187
write_done(libxl__egc * egc,libxl__datacopier_state * dc,int rc,int onwrite,int errnoval)188 static void write_done(libxl__egc *egc,
189 libxl__datacopier_state *dc,
190 int rc, int onwrite, int errnoval)
191 {
192 libxl__stream_write_state *stream = CONTAINER_OF(dc, *stream, dc);
193 STATE_AO_GC(stream->ao);
194 sws_record_done_cb cb = stream->record_done_callback;
195
196 stream->record_done_callback = NULL;
197
198 if (onwrite || errnoval)
199 stream_complete(egc, stream, rc ?: ERROR_FAIL);
200 else
201 cb(egc, stream);
202 }
203
204 /*----- Entrypoints -----*/
205
libxl__stream_write_init(libxl__stream_write_state * stream)206 void libxl__stream_write_init(libxl__stream_write_state *stream)
207 {
208 assert(stream->ao);
209
210 stream->shs.ao = stream->ao;
211 libxl__save_helper_init(&stream->shs);
212
213 stream->rc = 0;
214 stream->running = false;
215 stream->in_checkpoint = false;
216 stream->sync_teardown = false;
217 FILLZERO(stream->dc);
218 stream->record_done_callback = NULL;
219 FILLZERO(stream->emu_dc);
220 stream->emu_carefd = NULL;
221 FILLZERO(stream->emu_rec_hdr);
222 FILLZERO(stream->emu_sub_hdr);
223 stream->emu_body = NULL;
224 stream->device_model_version = LIBXL_DEVICE_MODEL_VERSION_UNKNOWN;
225 }
226
libxl__stream_write_start(libxl__egc * egc,libxl__stream_write_state * stream)227 void libxl__stream_write_start(libxl__egc *egc,
228 libxl__stream_write_state *stream)
229 {
230 libxl__datacopier_state *dc = &stream->dc;
231 libxl__domain_save_state *dss = stream->dss;
232 STATE_AO_GC(stream->ao);
233 struct libxl__sr_hdr hdr;
234 int rc = 0;
235
236 libxl__stream_write_init(stream);
237
238 stream->running = true;
239
240 dc->ao = ao;
241 dc->readfd = -1;
242 dc->writewhat = "stream header";
243 dc->copywhat = "save v2 stream";
244 dc->writefd = stream->fd;
245 dc->maxsz = -1;
246 dc->callback = stream_header_done;
247
248 if (stream->back_channel)
249 return;
250
251 if (dss->type == LIBXL_DOMAIN_TYPE_HVM) {
252 stream->device_model_version =
253 libxl__device_model_version_running(gc, dss->domid);
254 switch (stream->device_model_version) {
255 case LIBXL_DEVICE_MODEL_VERSION_QEMU_XEN:
256 stream->emu_sub_hdr.id = EMULATOR_QEMU_UPSTREAM;
257 break;
258
259 default:
260 rc = ERROR_FAIL;
261 LOGD(ERROR, dss->domid, "Unknown emulator for HVM domain");
262 goto err;
263 }
264 stream->emu_sub_hdr.index = 0;
265 }
266
267 rc = libxl__datacopier_start(dc);
268 if (rc)
269 goto err;
270
271 FILLZERO(hdr);
272 hdr.ident = htobe64(RESTORE_STREAM_IDENT);
273 hdr.version = htobe32(RESTORE_STREAM_VERSION);
274 hdr.options = htobe32(0);
275
276 libxl__datacopier_prefixdata(egc, dc, &hdr, sizeof(hdr));
277 return;
278
279 err:
280 assert(rc);
281 stream_complete(egc, stream, rc);
282 }
283
libxl__stream_write_start_checkpoint(libxl__egc * egc,libxl__stream_write_state * stream)284 void libxl__stream_write_start_checkpoint(libxl__egc *egc,
285 libxl__stream_write_state *stream)
286 {
287 assert(stream->running);
288 assert(!stream->in_checkpoint);
289 assert(!stream->back_channel);
290 stream->in_checkpoint = true;
291
292 write_emulator_xenstore_record(egc, stream);
293 }
294
libxl__stream_write_abort(libxl__egc * egc,libxl__stream_write_state * stream,int rc)295 void libxl__stream_write_abort(libxl__egc *egc,
296 libxl__stream_write_state *stream, int rc)
297 {
298 assert(rc);
299
300 if (stream->running)
301 stream_complete(egc, stream, rc);
302 }
303
304 /*----- Event logic -----*/
305
stream_header_done(libxl__egc * egc,libxl__datacopier_state * dc,int rc,int onwrite,int errnoval)306 static void stream_header_done(libxl__egc *egc,
307 libxl__datacopier_state *dc,
308 int rc, int onwrite, int errnoval)
309 {
310 libxl__stream_write_state *stream = CONTAINER_OF(dc, *stream, dc);
311 STATE_AO_GC(stream->ao);
312 struct libxl__sr_rec_hdr rec;
313
314 if (rc || errnoval) {
315 stream_complete(egc, stream, rc ?: ERROR_FAIL);
316 return;
317 }
318
319 FILLZERO(rec);
320 rec.type = REC_TYPE_LIBXC_CONTEXT;
321
322 setup_write(egc, stream, "libxc header",
323 &rec, NULL, libxc_header_done);
324 }
325
libxc_header_done(libxl__egc * egc,libxl__stream_write_state * stream)326 static void libxc_header_done(libxl__egc *egc,
327 libxl__stream_write_state *stream)
328 {
329 libxl__xc_domain_save(egc, stream->dss, &stream->shs);
330 }
331
libxl__xc_domain_save_done(libxl__egc * egc,void * dss_void,int rc,int retval,int errnoval)332 void libxl__xc_domain_save_done(libxl__egc *egc, void *dss_void,
333 int rc, int retval, int errnoval)
334 {
335 libxl__domain_save_state *dss = dss_void;
336 libxl__stream_write_state *stream = &dss->sws;
337 STATE_AO_GC(dss->ao);
338
339 if (rc)
340 goto err;
341
342 if (retval) {
343 LOGEVD(ERROR, errnoval, dss->domid, "saving domain: %s",
344 dss->dsps.guest_responded ?
345 "domain responded to suspend request" :
346 "domain did not respond to suspend request");
347 if (!dss->dsps.guest_responded)
348 rc = ERROR_GUEST_TIMEDOUT;
349 else if (dss->rc)
350 rc = dss->rc;
351 else
352 rc = ERROR_FAIL;
353 goto err;
354 }
355
356 err:
357 check_all_finished(egc, stream, rc);
358
359 /*
360 * This function is the callback associated with the save helper
361 * task, not the stream task. We do not know whether the stream is
362 * alive, and check_all_finished() may have torn it down around us.
363 * If the stream is not still alive, we must not continue any work.
364 */
365 if (libxl__stream_write_inuse(stream)) {
366 if (dss->checkpointed_stream != LIBXL_CHECKPOINTED_STREAM_NONE)
367 /*
368 * For remus, if libxl__xc_domain_save_done() completes,
369 * there was an error sending data to the secondary.
370 * Resume the primary ASAP. The caller doesn't care of the
371 * return value (Please refer to libxl__remus_teardown())
372 */
373 stream_complete(egc, stream, 0);
374 else
375 write_emulator_xenstore_record(egc, stream);
376 }
377 }
378
write_emulator_xenstore_record(libxl__egc * egc,libxl__stream_write_state * stream)379 static void write_emulator_xenstore_record(libxl__egc *egc,
380 libxl__stream_write_state *stream)
381 {
382 libxl__domain_save_state *dss = stream->dss;
383 STATE_AO_GC(stream->ao);
384 struct libxl__sr_rec_hdr rec;
385 int rc;
386 char *buf = NULL;
387 uint32_t len = 0;
388
389 if (dss->type != LIBXL_DOMAIN_TYPE_HVM) {
390 emulator_xenstore_record_done(egc, stream);
391 return;
392 }
393
394 rc = libxl__save_emulator_xenstore_data(dss, &buf, &len);
395 if (rc)
396 goto err;
397
398 /* No record? - All done. */
399 if (len == 0) {
400 emulator_xenstore_record_done(egc, stream);
401 return;
402 }
403
404 FILLZERO(rec);
405 rec.type = REC_TYPE_EMULATOR_XENSTORE_DATA;
406 rec.length = len + sizeof(stream->emu_sub_hdr);
407
408 setup_emulator_write(egc, stream, "emulator xenstore record",
409 &rec, &stream->emu_sub_hdr, buf,
410 emulator_xenstore_record_done);
411 return;
412
413 err:
414 assert(rc);
415 stream_complete(egc, stream, rc);
416 }
417
emulator_xenstore_record_done(libxl__egc * egc,libxl__stream_write_state * stream)418 static void emulator_xenstore_record_done(libxl__egc *egc,
419 libxl__stream_write_state *stream)
420 {
421 libxl__domain_save_state *dss = stream->dss;
422
423 if (dss->type == LIBXL_DOMAIN_TYPE_HVM)
424 write_emulator_context_record(egc, stream);
425 else {
426 if (stream->in_checkpoint)
427 write_checkpoint_end_record(egc, stream);
428 else
429 write_end_record(egc, stream);
430 }
431 }
432
write_emulator_context_record(libxl__egc * egc,libxl__stream_write_state * stream)433 static void write_emulator_context_record(libxl__egc *egc,
434 libxl__stream_write_state *stream)
435 {
436 libxl__domain_save_state *dss = stream->dss;
437 libxl__datacopier_state *dc = &stream->emu_dc;
438 STATE_AO_GC(stream->ao);
439 struct libxl__sr_rec_hdr *rec = &stream->emu_rec_hdr;
440 struct stat st;
441 int rc;
442
443 if (dss->type != LIBXL_DOMAIN_TYPE_HVM) {
444 emulator_context_record_done(egc, stream);
445 return;
446 }
447
448 /* Convenience aliases */
449 const char *const filename = dss->dsps.dm_savefile;
450
451 libxl__carefd_begin();
452 int readfd = open(filename, O_RDONLY);
453 stream->emu_carefd = libxl__carefd_opened(CTX, readfd);
454 if (readfd == -1) {
455 rc = ERROR_FAIL;
456 LOGED(ERROR, dss->domid, "unable to open %s", filename);
457 goto err;
458 }
459
460 if (fstat(readfd, &st)) {
461 rc = ERROR_FAIL;
462 LOGED(ERROR, dss->domid, "unable to fstat %s", filename);
463 goto err;
464 }
465
466 if (!S_ISREG(st.st_mode)) {
467 rc = ERROR_FAIL;
468 LOGD(ERROR, dss->domid, "%s is not a plain file!", filename);
469 goto err;
470 }
471
472 rec->type = REC_TYPE_EMULATOR_CONTEXT;
473 rec->length = st.st_size + sizeof(stream->emu_sub_hdr);
474 stream->emu_body = libxl__malloc(NOGC, st.st_size);
475
476 FILLZERO(*dc);
477 dc->ao = stream->ao;
478 dc->readwhat = "qemu save file";
479 dc->copywhat = "save v2 stream";
480 dc->readfd = readfd;
481 dc->writefd = -1;
482 dc->maxsz = -1;
483 dc->readbuf = stream->emu_body;
484 dc->bytes_to_read = st.st_size;
485 dc->callback = emulator_context_read_done;
486
487 rc = libxl__datacopier_start(dc);
488 if (rc)
489 goto err;
490
491 return;
492
493 err:
494 assert(rc);
495 stream_complete(egc, stream, rc);
496 }
497
emulator_context_read_done(libxl__egc * egc,libxl__datacopier_state * dc,int rc,int onwrite,int errnoval)498 static void emulator_context_read_done(libxl__egc *egc,
499 libxl__datacopier_state *dc,
500 int rc, int onwrite, int errnoval)
501 {
502 libxl__stream_write_state *stream = CONTAINER_OF(dc, *stream, emu_dc);
503 STATE_AO_GC(stream->ao);
504
505 if (rc || onwrite || errnoval) {
506 stream_complete(egc, stream, rc ?: ERROR_FAIL);
507 return;
508 }
509
510 libxl__carefd_close(stream->emu_carefd);
511 stream->emu_carefd = NULL;
512
513 setup_emulator_write(egc, stream, "emulator record",
514 &stream->emu_rec_hdr,
515 &stream->emu_sub_hdr,
516 stream->emu_body,
517 emulator_context_record_done);
518 }
519
emulator_context_record_done(libxl__egc * egc,libxl__stream_write_state * stream)520 static void emulator_context_record_done(libxl__egc *egc,
521 libxl__stream_write_state *stream)
522 {
523 free(stream->emu_body);
524 stream->emu_body = NULL;
525
526 if (stream->in_checkpoint)
527 write_checkpoint_end_record(egc, stream);
528 else
529 write_end_record(egc, stream);
530 }
531
write_end_record(libxl__egc * egc,libxl__stream_write_state * stream)532 static void write_end_record(libxl__egc *egc,
533 libxl__stream_write_state *stream)
534 {
535 struct libxl__sr_rec_hdr rec;
536
537 FILLZERO(rec);
538 rec.type = REC_TYPE_END;
539
540 setup_write(egc, stream, "end record",
541 &rec, NULL, stream_success);
542 }
543
write_checkpoint_end_record(libxl__egc * egc,libxl__stream_write_state * stream)544 static void write_checkpoint_end_record(libxl__egc *egc,
545 libxl__stream_write_state *stream)
546 {
547 struct libxl__sr_rec_hdr rec;
548
549 FILLZERO(rec);
550 rec.type = REC_TYPE_CHECKPOINT_END;
551
552 setup_write(egc, stream, "checkpoint end record",
553 &rec, NULL, checkpoint_end_record_done);
554 }
555
checkpoint_end_record_done(libxl__egc * egc,libxl__stream_write_state * stream)556 static void checkpoint_end_record_done(libxl__egc *egc,
557 libxl__stream_write_state *stream)
558 {
559 checkpoint_done(egc, stream, 0);
560 }
561
562 /*----- Success/error/cleanup handling. -----*/
563
stream_success(libxl__egc * egc,libxl__stream_write_state * stream)564 static void stream_success(libxl__egc *egc, libxl__stream_write_state *stream)
565 {
566 stream_complete(egc, stream, 0);
567 }
568
stream_complete(libxl__egc * egc,libxl__stream_write_state * stream,int rc)569 static void stream_complete(libxl__egc *egc,
570 libxl__stream_write_state *stream, int rc)
571 {
572 assert(stream->running);
573
574 if (stream->in_checkpoint) {
575 assert(rc);
576
577 /*
578 * If an error is encountered while in a checkpoint, pass it
579 * back to libxc. The failure will come back around to us via
580 * libxl__xc_domain_save_done()
581 */
582 checkpoint_done(egc, stream, rc);
583 return;
584 }
585
586 if (stream->in_checkpoint_state) {
587 assert(rc);
588
589 /*
590 * If an error is encountered while in a checkpoint, pass it
591 * back to libxc. The failure will come back around to us via
592 * 1. normal stream
593 * libxl__xc_domain_save_done()
594 * 2. back_channel stream
595 * libxl__stream_write_abort()
596 */
597 checkpoint_state_done(egc, stream, rc);
598 return;
599 }
600
601 stream_done(egc, stream, rc);
602 }
603
stream_done(libxl__egc * egc,libxl__stream_write_state * stream,int rc)604 static void stream_done(libxl__egc *egc,
605 libxl__stream_write_state *stream, int rc)
606 {
607 assert(stream->running);
608 assert(!stream->in_checkpoint_state);
609 stream->running = false;
610
611 if (stream->emu_carefd)
612 libxl__carefd_close(stream->emu_carefd);
613 free(stream->emu_body);
614
615 if (!stream->back_channel) {
616 /*
617 * 1. In stream_done(), stream->running is set to false, so
618 * the stream itself is not in use.
619 * 2. Write stream is a back channel stream, this means it
620 * is only used by secondary(restore side) to send records
621 * back, so it doesn't have save helper.
622 * So we don't need invoke check_all_finished here
623 */
624 check_all_finished(egc, stream, rc);
625 }
626 }
627
checkpoint_done(libxl__egc * egc,libxl__stream_write_state * stream,int rc)628 static void checkpoint_done(libxl__egc *egc,
629 libxl__stream_write_state *stream,
630 int rc)
631 {
632 assert(stream->in_checkpoint);
633
634 stream->in_checkpoint = false;
635 stream->checkpoint_callback(egc, stream, rc);
636 }
637
check_all_finished(libxl__egc * egc,libxl__stream_write_state * stream,int rc)638 static void check_all_finished(libxl__egc *egc,
639 libxl__stream_write_state *stream,
640 int rc)
641 {
642 STATE_AO_GC(stream->ao);
643
644 /*
645 * In the case of a failure, the _abort()'s below might cancel
646 * synchronously on top of us, or asynchronously at a later point.
647 *
648 * We must avoid the situation where all _abort() cancel
649 * synchronously and the completion_callback() gets called twice;
650 * once by the first error and once by the final stacked abort(),
651 * both of whom will find that all of the tasks have stopped.
652 *
653 * To avoid this problem, any stacked re-entry into this function is
654 * ineligible to fire the completion callback. The outermost
655 * instance will take care of completing, once the stack has
656 * unwound.
657 */
658 if (stream->sync_teardown)
659 return;
660
661 if (!stream->rc && rc) {
662 /* First reported failure. Tear everything down. */
663 stream->rc = rc;
664 stream->sync_teardown = true;
665
666 libxl__stream_write_abort(egc, stream, rc);
667 libxl__save_helper_abort(egc, &stream->shs);
668
669 stream->sync_teardown = false;
670 }
671
672 /* Don't fire the callback until all our parallel tasks have stopped. */
673 if (libxl__stream_write_inuse(stream) ||
674 libxl__save_helper_inuse(&stream->shs))
675 return;
676
677 if (stream->completion_callback)
678 /* back channel stream doesn't have completion_callback() */
679 stream->completion_callback(egc, stream, stream->rc);
680 }
681
682 /*----- checkpoint state -----*/
683
libxl__stream_write_checkpoint_state(libxl__egc * egc,libxl__stream_write_state * stream,libxl_sr_checkpoint_state * srcs)684 void libxl__stream_write_checkpoint_state(libxl__egc *egc,
685 libxl__stream_write_state *stream,
686 libxl_sr_checkpoint_state *srcs)
687 {
688 struct libxl__sr_rec_hdr rec;
689
690 assert(stream->running);
691 assert(!stream->in_checkpoint);
692 assert(!stream->in_checkpoint_state);
693 stream->in_checkpoint_state = true;
694
695 FILLZERO(rec);
696 rec.type = REC_TYPE_CHECKPOINT_STATE;
697 rec.length = sizeof(*srcs);
698
699 setup_write(egc, stream, "checkpoint state", &rec,
700 srcs, write_checkpoint_state_done);
701 }
702
write_checkpoint_state_done(libxl__egc * egc,libxl__stream_write_state * stream)703 static void write_checkpoint_state_done(libxl__egc *egc,
704 libxl__stream_write_state *stream)
705 {
706 checkpoint_state_done(egc, stream, 0);
707 }
708
checkpoint_state_done(libxl__egc * egc,libxl__stream_write_state * stream,int rc)709 static void checkpoint_state_done(libxl__egc *egc,
710 libxl__stream_write_state *stream, int rc)
711 {
712 assert(stream->in_checkpoint_state);
713 stream->in_checkpoint_state = false;
714 stream->checkpoint_callback(egc, stream, rc);
715 }
716
717 /*
718 * Local variables:
719 * mode: C
720 * c-basic-offset: 4
721 * indent-tabs-mode: nil
722 * End:
723 */
724