1 /* block-remus.c
2 *
3 * This disk sends all writes to a backup via a network interface before
4 * passing them to an underlying device.
5 * The backup is a bit more complicated:
6 * 1. It applies all incoming writes to a ramdisk.
7 * 2. When a checkpoint request arrives, it moves the ramdisk to
8 * a committing state and uses a new ramdisk for subsequent writes.
9 * It also acknowledges the request, to let the sender know it can
10 * release output.
11 * 3. The ramdisk flushes its contents to the underlying driver.
12 * 4. At failover, the backup waits for the in-flight ramdisk (if any) to
13 * drain before letting the domain be activated.
14 *
15 * The driver determines whether it is the client or server by attempting
16 * to bind to the replication address. If the address is not local,
17 * the driver acts as client.
18 *
19 * The following messages are defined for the replication stream:
20 * 1. write request
21 * "wreq" 4
22 * num_sectors 4
23 * sector 8
24 * buffer (num_sectors * sector_size)
25 * 2. submit request (may be used as a barrier
26 * "sreq" 4
27 * 3. commit request
28 * "creq" 4
29 * After a commit request, the client must wait for a competion message:
30 * 4. completion
31 * "done" 4
32 */
33
34 /* due to architectural choices in tapdisk, block-buffer is forced to
35 * reimplement some code which is meant to be private */
36 #include "tapdisk.h"
37 #include "tapdisk-server.h"
38 #include "tapdisk-driver.h"
39 #include "tapdisk-interface.h"
40 #include "hashtable.h"
41 #include "hashtable_itr.h"
42 #include "hashtable_utility.h"
43
44 #include <errno.h>
45 #include <inttypes.h>
46 #include <fcntl.h>
47 #include <stdio.h>
48 #include <stdlib.h>
49 #include <string.h>
50 #include <sys/time.h>
51 #include <sys/types.h>
52 #include <sys/socket.h>
53 #include <netdb.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <sys/param.h>
57 #include <unistd.h>
58 #include <sys/stat.h>
59
60 /* timeout for reads and writes in ms */
61 #define HEARTBEAT_MS 1000
62 #define RAMDISK_HASHSIZE 128
63
64 /* connect retry timeout (seconds) */
65 #define REMUS_CONNRETRY_TIMEOUT 10
66
67 #define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
68
69 enum tdremus_mode {
70 mode_invalid = 0,
71 mode_unprotected,
72 mode_primary,
73 mode_backup
74 };
75
76 struct tdremus_req {
77 uint64_t sector;
78 int nb_sectors;
79 char buf[4096];
80 };
81
82 struct req_ring {
83 /* waste one slot to distinguish between empty and full */
84 struct tdremus_req requests[MAX_REQUESTS * 2 + 1];
85 unsigned int head;
86 unsigned int tail;
87 };
88
89 /* TODO: This isn't very pretty, but to properly generate our own treqs (needed
90 * by the backup) we need to know our td_vbt_t and td_image_t (blktap2
91 * internals). As a proper fix, we should consider extending the tapdisk
92 * interface with a td_create_request() function, or something similar.
93 *
94 * For now, we just grab the vbd in the td_open() command, and the td_image_t
95 * from the first read request.
96 */
97 td_vbd_t *device_vbd = NULL;
98 td_image_t *remus_image = NULL;
99 struct tap_disk tapdisk_remus;
100
101 struct ramdisk {
102 size_t sector_size;
103 struct hashtable* h;
104 /* when a ramdisk is flushed, h is given a new empty hash for writes
105 * while the old ramdisk (prev) is drained asynchronously.
106 */
107 struct hashtable* prev;
108 /* count of outstanding requests to the base driver */
109 size_t inflight;
110 /* prev holds the requests to be flushed, while inprogress holds
111 * requests being flushed. When requests complete, they are removed
112 * from inprogress.
113 * Whenever a new flush is merged with ongoing flush (i.e, prev),
114 * we have to make sure that none of the new requests overlap with
115 * ones in "inprogress". If it does, keep it back in prev and dont issue
116 * IO until the current one finishes. If we allow this IO to proceed,
117 * we might end up with two "overlapping" requests in the disk's queue and
118 * the disk may not offer any guarantee on which one is written first.
119 * IOW, make sure we dont create a write-after-write time ordering constraint.
120 *
121 */
122 struct hashtable* inprogress;
123 };
124
125 /* the ramdisk intercepts the original callback for reads and writes.
126 * This holds the original data. */
127 /* Might be worth making this a static array in struct ramdisk to avoid
128 * a malloc per request */
129
130 struct tdremus_state;
131
132 struct ramdisk_cbdata {
133 td_callback_t cb;
134 void* private;
135 char* buf;
136 struct tdremus_state* state;
137 };
138
139 struct ramdisk_write_cbdata {
140 struct tdremus_state* state;
141 char* buf;
142 };
143
144 typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
145
146 /* poll_fd type for blktap2 fd system. taken from block_log.c */
147 typedef struct poll_fd {
148 int fd;
149 event_id_t id;
150 } poll_fd_t;
151
152 struct tdremus_state {
153 // struct tap_disk* driver;
154 void* driver_data;
155
156 /* XXX: this is needed so that the server can perform operations on
157 * the driver from the stream_fd event handler. fix this. */
158 td_driver_t *tdremus_driver;
159
160 /* TODO: we may wish to replace these two FIFOs with a unix socket */
161 char* ctl_path; /* receive flush instruction here */
162 poll_fd_t ctl_fd; /* io_fd slot for control FIFO */
163 char* msg_path; /* output completion message here */
164 poll_fd_t msg_fd;
165
166 /* replication host */
167 struct sockaddr_in sa;
168 poll_fd_t server_fd; /* server listen port */
169 poll_fd_t stream_fd; /* replication channel */
170
171 /* queue write requests, batch-replicate at submit */
172 struct req_ring write_ring;
173
174 /* ramdisk data*/
175 struct ramdisk ramdisk;
176
177 /* mode methods */
178 enum tdremus_mode mode;
179 int (*queue_flush)(td_driver_t *driver);
180 };
181
182 typedef struct tdremus_wire {
183 uint32_t op;
184 uint64_t id;
185 uint64_t sec;
186 uint32_t secs;
187 } tdremus_wire_t;
188
189 #define TDREMUS_READ "rreq"
190 #define TDREMUS_WRITE "wreq"
191 #define TDREMUS_SUBMIT "sreq"
192 #define TDREMUS_COMMIT "creq"
193 #define TDREMUS_DONE "done"
194 #define TDREMUS_FAIL "fail"
195
196 /* primary read/write functions */
197 static void primary_queue_read(td_driver_t *driver, td_request_t treq);
198 static void primary_queue_write(td_driver_t *driver, td_request_t treq);
199
200 /* backup read/write functions */
201 static void backup_queue_read(td_driver_t *driver, td_request_t treq);
202 static void backup_queue_write(td_driver_t *driver, td_request_t treq);
203
204 /* unpritected read/write functions */
205 static void unprotected_queue_read(td_driver_t *driver, td_request_t treq);
206 static void unprotected_queue_write(td_driver_t *driver, td_request_t treq);
207
208 static int tdremus_close(td_driver_t *driver);
209
210 static int switch_mode(td_driver_t *driver, enum tdremus_mode mode);
211 static int ctl_respond(struct tdremus_state *s, const char *response);
212
213 /* ring functions */
ring_next(struct req_ring * ring,unsigned int pos)214 static inline unsigned int ring_next(struct req_ring* ring, unsigned int pos)
215 {
216 if (++pos >= MAX_REQUESTS * 2 + 1)
217 return 0;
218
219 return pos;
220 }
221
ring_isempty(struct req_ring * ring)222 static inline int ring_isempty(struct req_ring* ring)
223 {
224 return ring->head == ring->tail;
225 }
226
ring_isfull(struct req_ring * ring)227 static inline int ring_isfull(struct req_ring* ring)
228 {
229 return ring_next(ring, ring->tail) == ring->head;
230 }
231 /* Prototype declarations */
232 static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s);
233
234 /* functions to create and sumbit treq's */
235
236 static void
replicated_write_callback(td_request_t treq,int err)237 replicated_write_callback(td_request_t treq, int err)
238 {
239 struct tdremus_state *s = (struct tdremus_state *) treq.cb_data;
240 td_vbd_request_t *vreq;
241 int i;
242 uint64_t start;
243 vreq = (td_vbd_request_t *) treq.private;
244
245 /* the write failed for now, lets panic. this is very bad */
246 if (err) {
247 RPRINTF("ramdisk write failed, disk image is not consistent\n");
248 exit(-1);
249 }
250
251 /* The write succeeded. let's pull the vreq off whatever request list
252 * it is on and free() it */
253 list_del(&vreq->next);
254 free(vreq);
255
256 s->ramdisk.inflight--;
257 start = treq.sec;
258 for (i = 0; i < treq.secs; i++) {
259 hashtable_remove(s->ramdisk.inprogress, &start);
260 start++;
261 }
262 free(treq.buf);
263
264 if (!s->ramdisk.inflight && !s->ramdisk.prev) {
265 /* TODO: the ramdisk has been flushed */
266 }
267 }
268
269 static inline int
create_write_request(struct tdremus_state * state,td_sector_t sec,int secs,char * buf)270 create_write_request(struct tdremus_state *state, td_sector_t sec, int secs, char *buf)
271 {
272 td_request_t treq;
273 td_vbd_request_t *vreq;
274
275 treq.op = TD_OP_WRITE;
276 treq.buf = buf;
277 treq.sec = sec;
278 treq.secs = secs;
279 treq.image = remus_image;
280 treq.cb = replicated_write_callback;
281 treq.cb_data = state;
282 treq.id = 0;
283 treq.sidx = 0;
284
285 vreq = calloc(1, sizeof(td_vbd_request_t));
286 treq.private = vreq;
287
288 if(!vreq)
289 return -1;
290
291 vreq->submitting = 1;
292 INIT_LIST_HEAD(&vreq->next);
293 tapdisk_vbd_move_request(treq.private, &device_vbd->pending_requests);
294
295 /* TODO:
296 * we should probably leave it up to the caller to forward the request */
297 td_forward_request(treq);
298
299 vreq->submitting--;
300
301 return 0;
302 }
303
304
305 /* http://www.concentric.net/~Ttwang/tech/inthash.htm */
uint64_hash(void * k)306 static unsigned int uint64_hash(void* k)
307 {
308 uint64_t key = *(uint64_t*)k;
309
310 key = (~key) + (key << 18);
311 key = key ^ (key >> 31);
312 key = key * 21;
313 key = key ^ (key >> 11);
314 key = key + (key << 6);
315 key = key ^ (key >> 22);
316
317 return (unsigned int)key;
318 }
319
rd_hash_equal(void * k1,void * k2)320 static int rd_hash_equal(void* k1, void* k2)
321 {
322 uint64_t key1, key2;
323
324 key1 = *(uint64_t*)k1;
325 key2 = *(uint64_t*)k2;
326
327 return key1 == key2;
328 }
329
ramdisk_read(struct ramdisk * ramdisk,uint64_t sector,int nb_sectors,char * buf)330 static int ramdisk_read(struct ramdisk* ramdisk, uint64_t sector,
331 int nb_sectors, char* buf)
332 {
333 int i;
334 char* v;
335 uint64_t key;
336
337 for (i = 0; i < nb_sectors; i++) {
338 key = sector + i;
339 /* check whether it is queued in a previous flush request */
340 if (!(ramdisk->prev && (v = hashtable_search(ramdisk->prev, &key)))) {
341 /* check whether it is an ongoing flush */
342 if (!(ramdisk->inprogress && (v = hashtable_search(ramdisk->inprogress, &key))))
343 return -1;
344 }
345 memcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size);
346 }
347
348 return 0;
349 }
350
ramdisk_write_hash(struct hashtable * h,uint64_t sector,char * buf,size_t len)351 static int ramdisk_write_hash(struct hashtable* h, uint64_t sector, char* buf,
352 size_t len)
353 {
354 char* v;
355 uint64_t* key;
356
357 if ((v = hashtable_search(h, §or))) {
358 memcpy(v, buf, len);
359 return 0;
360 }
361
362 if (!(v = malloc(len))) {
363 DPRINTF("ramdisk_write_hash: malloc failed\n");
364 return -1;
365 }
366 memcpy(v, buf, len);
367 if (!(key = malloc(sizeof(*key)))) {
368 DPRINTF("ramdisk_write_hash: error allocating key\n");
369 free(v);
370 return -1;
371 }
372 *key = sector;
373 if (!hashtable_insert(h, key, v)) {
374 DPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n", sector);
375 free(key);
376 free(v);
377 return -1;
378 }
379
380 return 0;
381 }
382
ramdisk_write(struct ramdisk * ramdisk,uint64_t sector,int nb_sectors,char * buf)383 static inline int ramdisk_write(struct ramdisk* ramdisk, uint64_t sector,
384 int nb_sectors, char* buf)
385 {
386 int i, rc;
387
388 for (i = 0; i < nb_sectors; i++) {
389 rc = ramdisk_write_hash(ramdisk->h, sector + i,
390 buf + i * ramdisk->sector_size,
391 ramdisk->sector_size);
392 if (rc)
393 return rc;
394 }
395
396 return 0;
397 }
398
uint64_compare(const void * k1,const void * k2)399 static int uint64_compare(const void* k1, const void* k2)
400 {
401 uint64_t u1 = *(uint64_t*)k1;
402 uint64_t u2 = *(uint64_t*)k2;
403
404 /* u1 - u2 is unsigned */
405 return u1 < u2 ? -1 : u1 > u2 ? 1 : 0;
406 }
407
408 /* set psectors to an array of the sector numbers in the hash, returning
409 * the number of entries (or -1 on error) */
ramdisk_get_sectors(struct hashtable * h,uint64_t ** psectors)410 static int ramdisk_get_sectors(struct hashtable* h, uint64_t** psectors)
411 {
412 struct hashtable_itr* itr;
413 uint64_t* sectors;
414 int count;
415
416 if (!(count = hashtable_count(h)))
417 return 0;
418
419 if (!(*psectors = malloc(count * sizeof(uint64_t)))) {
420 DPRINTF("ramdisk_get_sectors: error allocating sector map\n");
421 return -1;
422 }
423 sectors = *psectors;
424
425 itr = hashtable_iterator(h);
426 count = 0;
427 do {
428 sectors[count++] = *(uint64_t*)hashtable_iterator_key(itr);
429 } while (hashtable_iterator_advance(itr));
430 free(itr);
431
432 return count;
433 }
434
435 /*
436 return -1 for OOM
437 return -2 for merge lookup failure
438 return -3 for WAW race
439 return 0 on success.
440 */
merge_requests(struct ramdisk * ramdisk,uint64_t start,size_t count,char ** mergedbuf)441 static int merge_requests(struct ramdisk* ramdisk, uint64_t start,
442 size_t count, char **mergedbuf)
443 {
444 char* buf;
445 char* sector;
446 int i;
447 uint64_t *key;
448 int rc = 0;
449
450 if (!(buf = valloc(count * ramdisk->sector_size))) {
451 DPRINTF("merge_request: allocation failed\n");
452 return -1;
453 }
454
455 for (i = 0; i < count; i++) {
456 if (!(sector = hashtable_search(ramdisk->prev, &start))) {
457 DPRINTF("merge_request: lookup failed on %"PRIu64"\n", start);
458 free(buf);
459 rc = -2;
460 goto fail;
461 }
462
463 /* Check inprogress requests to avoid waw non-determinism */
464 if (hashtable_search(ramdisk->inprogress, &start)) {
465 DPRINTF("merge_request: WAR RACE on %"PRIu64"\n", start);
466 free(buf);
467 rc = -3;
468 goto fail;
469 }
470 /* Insert req into inprogress (brief period of duplication of hash entries until
471 * they are removed from prev. Read tracking would not be reading wrong entries)
472 */
473 if (!(key = malloc(sizeof(*key)))) {
474 DPRINTF("%s: error allocating key\n", __FUNCTION__);
475 free(buf);
476 rc = -1;
477 goto fail;
478 }
479 *key = start;
480 if (!hashtable_insert(ramdisk->inprogress, key, NULL)) {
481 DPRINTF("%s failed to insert sector %" PRIu64 " into inprogress hash\n",
482 __FUNCTION__, start);
483 free(key);
484 free(buf);
485 rc = -1;
486 goto fail;
487 }
488 memcpy(buf + i * ramdisk->sector_size, sector, ramdisk->sector_size);
489 start++;
490 }
491
492 *mergedbuf = buf;
493 return 0;
494 fail:
495 for (start--; i >0; i--, start--)
496 hashtable_remove(ramdisk->inprogress, &start);
497 return rc;
498 }
499
500 /* The underlying driver may not handle having the whole ramdisk queued at
501 * once. We queue what we can and let the callbacks attempt to queue more. */
502 /* NOTE: may be called from callback, while dd->private still belongs to
503 * the underlying driver */
ramdisk_flush(td_driver_t * driver,struct tdremus_state * s)504 static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s)
505 {
506 uint64_t* sectors;
507 char* buf = NULL;
508 uint64_t base, batchlen;
509 int i, j, count = 0;
510
511 // RPRINTF("ramdisk flush\n");
512
513 if ((count = ramdisk_get_sectors(s->ramdisk.prev, §ors)) <= 0)
514 return count;
515
516 /* Create the inprogress table if empty */
517 if (!s->ramdisk.inprogress)
518 s->ramdisk.inprogress = create_hashtable(RAMDISK_HASHSIZE,
519 uint64_hash,
520 rd_hash_equal);
521
522 /*
523 RPRINTF("ramdisk: flushing %d sectors\n", count);
524 */
525
526 /* sort and merge sectors to improve disk performance */
527 qsort(sectors, count, sizeof(*sectors), uint64_compare);
528
529 for (i = 0; i < count;) {
530 base = sectors[i++];
531 while (i < count && sectors[i] == sectors[i-1] + 1)
532 i++;
533 batchlen = sectors[i-1] - base + 1;
534
535 j = merge_requests(&s->ramdisk, base, batchlen, &buf);
536
537 if (j) {
538 RPRINTF("ramdisk_flush: merge_requests failed:%s\n",
539 j == -1? "OOM": (j==-2? "missing sector" : "WAW race"));
540 if (j == -3) continue;
541 free(sectors);
542 return -1;
543 }
544
545 /* NOTE: create_write_request() creates a treq AND forwards it down
546 * the driver chain */
547 // RPRINTF("forwarding write request at %" PRIu64 ", length: %" PRIu64 "\n", base, batchlen);
548 create_write_request(s, base, batchlen, buf);
549 //RPRINTF("write request at %" PRIu64 ", length: %" PRIu64 " forwarded\n", base, batchlen);
550
551 s->ramdisk.inflight++;
552
553 for (j = 0; j < batchlen; j++) {
554 buf = hashtable_search(s->ramdisk.prev, &base);
555 free(buf);
556 hashtable_remove(s->ramdisk.prev, &base);
557 base++;
558 }
559 }
560
561 if (!hashtable_count(s->ramdisk.prev)) {
562 /* everything is in flight */
563 hashtable_destroy(s->ramdisk.prev, 0);
564 s->ramdisk.prev = NULL;
565 }
566
567 free(sectors);
568
569 // RPRINTF("ramdisk flush done\n");
570 return 0;
571 }
572
573 /* flush ramdisk contents to disk */
ramdisk_start_flush(td_driver_t * driver)574 static int ramdisk_start_flush(td_driver_t *driver)
575 {
576 struct tdremus_state *s = (struct tdremus_state *)driver->data;
577 uint64_t* key;
578 char* buf;
579 int rc = 0;
580 int i, j, count, batchlen;
581 uint64_t* sectors;
582
583 if (!hashtable_count(s->ramdisk.h)) {
584 /*
585 RPRINTF("Nothing to flush\n");
586 */
587 return 0;
588 }
589
590 if (s->ramdisk.prev) {
591 /* a flush request issued while a previous flush is still in progress
592 * will merge with the previous request. If you want the previous
593 * request to be consistent, wait for it to complete. */
594 if ((count = ramdisk_get_sectors(s->ramdisk.h, §ors)) < 0)
595 return count;
596
597 for (i = 0; i < count; i++) {
598 buf = hashtable_search(s->ramdisk.h, sectors + i);
599 ramdisk_write_hash(s->ramdisk.prev, sectors[i], buf,
600 s->ramdisk.sector_size);
601 }
602 free(sectors);
603
604 hashtable_destroy (s->ramdisk.h, 0);
605 } else
606 s->ramdisk.prev = s->ramdisk.h;
607
608 /* We create a new hashtable so that new writes can be performed before
609 * the old hashtable is completely drained. */
610 s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
611 rd_hash_equal);
612
613 return ramdisk_flush(driver, s);
614 }
615
616
ramdisk_start(td_driver_t * driver)617 static int ramdisk_start(td_driver_t *driver)
618 {
619 struct tdremus_state *s = (struct tdremus_state *)driver->data;
620
621 if (s->ramdisk.h) {
622 RPRINTF("ramdisk already allocated\n");
623 return 0;
624 }
625
626 s->ramdisk.sector_size = driver->info.sector_size;
627 s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
628 rd_hash_equal);
629
630 DPRINTF("Ramdisk started, %zu bytes/sector\n", s->ramdisk.sector_size);
631
632 return 0;
633 }
634
635 /* common client/server functions */
636 /* mayberead: Time out after a certain interval. */
mread(int fd,void * buf,size_t len)637 static int mread(int fd, void* buf, size_t len)
638 {
639 fd_set rfds;
640 int rc;
641 size_t cur = 0;
642 struct timeval tv = {
643 .tv_sec = HEARTBEAT_MS / 1000,
644 .tv_usec = (HEARTBEAT_MS % 1000) * 1000
645 };
646
647 if (!len)
648 return 0;
649
650 /* read first. Only select if read is incomplete. */
651 rc = read(fd, buf, len);
652 while (rc < 0 || cur + rc < len) {
653 if (!rc) {
654 RPRINTF("end-of-file");
655 return -1;
656 }
657 if (rc < 0 && errno != EAGAIN) {
658 RPRINTF("error during read: %s\n", strerror(errno));
659 return -1;
660 }
661 if (rc > 0)
662 cur += rc;
663
664 FD_ZERO(&rfds);
665 FD_SET(fd, &rfds);
666 if (!(rc = select(fd + 1, &rfds, NULL, NULL, &tv))) {
667 RPRINTF("time out during read\n");
668 return -1;
669 } else if (rc < 0) {
670 RPRINTF("error during select: %d\n", errno);
671 return -1;
672 }
673 rc = read(fd, buf + cur, len - cur);
674 }
675 /*
676 RPRINTF("read %d bytes\n", cur + rc);
677 */
678
679 return 0;
680 }
681
mwrite(int fd,void * buf,size_t len)682 static int mwrite(int fd, void* buf, size_t len)
683 {
684 fd_set wfds;
685 size_t cur = 0;
686 int rc;
687 struct timeval tv = {
688 .tv_sec = HEARTBEAT_MS / 1000,
689 .tv_usec = (HEARTBEAT_MS % 1000) * 1000
690 };
691
692 if (!len)
693 return 0;
694
695 /* read first. Only select if read is incomplete. */
696 rc = write(fd, buf, len);
697 while (rc < 0 || cur + rc < len) {
698 if (!rc) {
699 RPRINTF("end-of-file");
700 return -1;
701 }
702 if (rc < 0 && errno != EAGAIN) {
703 RPRINTF("error during write: %s\n", strerror(errno));
704 return -1;
705 }
706 if (rc > 0)
707 cur += rc;
708
709 FD_ZERO(&wfds);
710 FD_SET(fd, &wfds);
711 if (!(rc = select(fd + 1, NULL, &wfds, NULL, &tv))) {
712 RPRINTF("time out during write\n");
713 return -1;
714 } else if (rc < 0) {
715 RPRINTF("error during select: %d\n", errno);
716 return -1;
717 }
718 rc = write(fd, buf + cur, len - cur);
719 }
720 /*
721 RPRINTF("wrote %d bytes\n", cur + rc);
722 */
723
724 return 0;
725 FD_ZERO(&wfds);
726 FD_SET(fd, &wfds);
727 select(fd + 1, NULL, &wfds, NULL, &tv);
728 }
729
730
close_stream_fd(struct tdremus_state * s)731 static void inline close_stream_fd(struct tdremus_state *s)
732 {
733 /* XXX: -2 is magic. replace with macro perhaps? */
734 tapdisk_server_unregister_event(s->stream_fd.id);
735 close(s->stream_fd.fd);
736 s->stream_fd.fd = -2;
737 }
738
739 /* primary functions */
740 static void remus_client_event(event_id_t, char mode, void *private);
741 static void remus_connect_event(event_id_t id, char mode, void *private);
742 static void remus_retry_connect_event(event_id_t id, char mode, void *private);
743
primary_do_connect(struct tdremus_state * state)744 static int primary_do_connect(struct tdremus_state *state)
745 {
746 event_id_t id;
747 int fd;
748 int rc;
749 int flags;
750
751 RPRINTF("client connecting to %s:%d...\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
752
753 if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
754 RPRINTF("could not create client socket: %d\n", errno);
755 return -1;
756 }
757
758 /* make socket nonblocking */
759 if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
760 flags = 0;
761 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
762 return -1;
763
764 /* once we have created the socket and populated the address, we can now start
765 * our non-blocking connect. rather than duplicating code we trigger a timeout
766 * on the socket fd, which calls out nonblocking connect code
767 */
768 if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0, remus_retry_connect_event, state)) < 0) {
769 RPRINTF("error registering timeout client connection event handler: %s\n", strerror(id));
770 /* TODO: we leak a fd here */
771 return -1;
772 }
773 state->stream_fd.fd = fd;
774 state->stream_fd.id = id;
775 return 0;
776 }
777
primary_blocking_connect(struct tdremus_state * state)778 static int primary_blocking_connect(struct tdremus_state *state)
779 {
780 int fd;
781 int id;
782 int rc;
783 int flags;
784
785 RPRINTF("client connecting to %s:%d...\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
786
787 if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
788 RPRINTF("could not create client socket: %d\n", errno);
789 return -1;
790 }
791
792 do {
793 if ((rc = connect(fd, (struct sockaddr *)&state->sa,
794 sizeof(state->sa))) < 0)
795 {
796 if (errno == ECONNREFUSED) {
797 RPRINTF("connection refused -- retrying in 1 second\n");
798 sleep(1);
799 } else {
800 RPRINTF("connection failed: %d\n", errno);
801 close(fd);
802 return -1;
803 }
804 }
805 } while (rc < 0);
806
807 RPRINTF("client connected\n");
808
809 /* make socket nonblocking */
810 if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
811 flags = 0;
812 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
813 {
814 RPRINTF("error making socket nonblocking\n");
815 close(fd);
816 return -1;
817 }
818
819 if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, fd, 0, remus_client_event, state)) < 0) {
820 RPRINTF("error registering client event handler: %s\n", strerror(id));
821 close(fd);
822 return -1;
823 }
824
825 state->stream_fd.fd = fd;
826 state->stream_fd.id = id;
827 return 0;
828 }
829
830 /* on read, just pass request through */
primary_queue_read(td_driver_t * driver,td_request_t treq)831 static void primary_queue_read(td_driver_t *driver, td_request_t treq)
832 {
833 /* just pass read through */
834 td_forward_request(treq);
835 }
836
837 /* TODO:
838 * The primary uses mwrite() to write the contents of a write request to the
839 * backup. This effectively blocks until all data has been copied into a system
840 * buffer or a timeout has occured. We may wish to instead use tapdisk's
841 * nonblocking i/o interface, tapdisk_server_register_event(), to set timeouts
842 * and write data in an asynchronous fashion.
843 */
primary_queue_write(td_driver_t * driver,td_request_t treq)844 static void primary_queue_write(td_driver_t *driver, td_request_t treq)
845 {
846 struct tdremus_state *s = (struct tdremus_state *)driver->data;
847
848 char header[sizeof(uint32_t) + sizeof(uint64_t)];
849 uint32_t *sectors = (uint32_t *)header;
850 uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
851
852 // RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
853
854 /* -1 means we haven't connected yet, -2 means the connection was lost */
855 if(s->stream_fd.fd == -1) {
856 RPRINTF("connecting to backup...\n");
857 primary_blocking_connect(s);
858 }
859
860 *sectors = treq.secs;
861 *sector = treq.sec;
862
863 if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0)
864 goto fail;
865 if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
866 goto fail;
867
868 if (mwrite(s->stream_fd.fd, treq.buf, treq.secs * driver->info.sector_size) < 0)
869 goto fail;
870
871 td_forward_request(treq);
872
873 return;
874
875 fail:
876 /* switch to unprotected mode and tell tapdisk to retry */
877 RPRINTF("write request replication failed, switching to unprotected mode");
878 switch_mode(s->tdremus_driver, mode_unprotected);
879 td_complete_request(treq, -EBUSY);
880 }
881
882
client_flush(td_driver_t * driver)883 static int client_flush(td_driver_t *driver)
884 {
885 struct tdremus_state *s = (struct tdremus_state *)driver->data;
886
887 // RPRINTF("committing output\n");
888
889 if (s->stream_fd.fd == -1)
890 /* connection not yet established, nothing to flush */
891 return 0;
892
893 if (mwrite(s->stream_fd.fd, TDREMUS_COMMIT, strlen(TDREMUS_COMMIT)) < 0) {
894 RPRINTF("error flushing output");
895 close_stream_fd(s);
896 return -1;
897 }
898
899 return 0;
900 }
901
server_flush(td_driver_t * driver)902 static int server_flush(td_driver_t *driver)
903 {
904 struct tdremus_state *s = (struct tdremus_state *)driver->data;
905 /*
906 * Nothing to flush in beginning.
907 */
908 if (!s->ramdisk.prev)
909 return 0;
910 /* Try to flush any remaining requests */
911 return ramdisk_flush(driver, s);
912 }
913
primary_start(td_driver_t * driver)914 static int primary_start(td_driver_t *driver)
915 {
916 struct tdremus_state *s = (struct tdremus_state *)driver->data;
917
918 RPRINTF("activating client mode\n");
919
920 tapdisk_remus.td_queue_read = primary_queue_read;
921 tapdisk_remus.td_queue_write = primary_queue_write;
922 s->queue_flush = client_flush;
923
924 s->stream_fd.fd = -1;
925 s->stream_fd.id = -1;
926
927 return 0;
928 }
929
930 /* timeout callback */
remus_retry_connect_event(event_id_t id,char mode,void * private)931 static void remus_retry_connect_event(event_id_t id, char mode, void *private)
932 {
933 struct tdremus_state *s = (struct tdremus_state *)private;
934
935 /* do a non-blocking connect */
936 if (connect(s->stream_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa))
937 && errno != EINPROGRESS)
938 {
939 if(errno == ECONNREFUSED || errno == ENETUNREACH || errno == EAGAIN || errno == ECONNABORTED)
940 {
941 /* try again in a second */
942 tapdisk_server_unregister_event(s->stream_fd.id);
943 if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, s->stream_fd.fd, REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) {
944 RPRINTF("error registering timeout client connection event handler: %s\n", strerror(id));
945 return;
946 }
947 s->stream_fd.id = id;
948 }
949 else
950 {
951 /* not recoverable */
952 RPRINTF("error connection to server %s\n", strerror(errno));
953 return;
954 }
955 }
956 else
957 {
958 /* the connect returned EINPROGRESS (nonblocking connect) we must wait for the fd to be writeable to determine if the connect worked */
959
960 tapdisk_server_unregister_event(s->stream_fd.id);
961 if((id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD, s->stream_fd.fd, 0, remus_connect_event, s)) < 0) {
962 RPRINTF("error registering client connection event handler: %s\n", strerror(id));
963 return;
964 }
965 s->stream_fd.id = id;
966 }
967 }
968
969 /* callback when nonblocking connect() is finished */
970 /* called only by primary in unprotected state */
remus_connect_event(event_id_t id,char mode,void * private)971 static void remus_connect_event(event_id_t id, char mode, void *private)
972 {
973 int socket_errno;
974 socklen_t socket_errno_size;
975 struct tdremus_state *s = (struct tdremus_state *)private;
976
977 /* check to se if the connect succeeded */
978 socket_errno_size = sizeof(socket_errno);
979 if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR, &socket_errno, &socket_errno_size)) {
980 RPRINTF("error getting socket errno\n");
981 return;
982 }
983
984 RPRINTF("socket connect returned %d\n", socket_errno);
985
986 if(socket_errno)
987 {
988 /* the connect did not succeed */
989
990 if(socket_errno == ECONNREFUSED || socket_errno == ENETUNREACH || socket_errno == ETIMEDOUT
991 || socket_errno == ECONNABORTED || socket_errno == EAGAIN)
992 {
993 /* we can probably assume that the backup is down. just try again later */
994 tapdisk_server_unregister_event(s->stream_fd.id);
995 if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, s->stream_fd.fd, REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) {
996 RPRINTF("error registering timeout client connection event handler: %s\n", strerror(id));
997 return;
998 }
999 s->stream_fd.id = id;
1000 }
1001 else
1002 {
1003 RPRINTF("socket connect returned %d, giving up\n", socket_errno);
1004 }
1005 }
1006 else
1007 {
1008 /* the connect succeeded */
1009
1010 /* unregister this function and register a new event handler */
1011 tapdisk_server_unregister_event(s->stream_fd.id);
1012 if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->stream_fd.fd, 0, remus_client_event, s)) < 0) {
1013 RPRINTF("error registering client event handler: %s\n", strerror(id));
1014 return;
1015 }
1016 s->stream_fd.id = id;
1017
1018 /* switch from unprotected to protected client */
1019 switch_mode(s->tdremus_driver, mode_primary);
1020 }
1021 }
1022
1023
1024 /* we install this event handler on the primary once we have connected to the backup */
1025 /* wait for "done" message to commit checkpoint */
remus_client_event(event_id_t id,char mode,void * private)1026 static void remus_client_event(event_id_t id, char mode, void *private)
1027 {
1028 struct tdremus_state *s = (struct tdremus_state *)private;
1029 char req[5];
1030 int rc;
1031
1032 if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
1033 /* replication stream closed or otherwise broken (timeout, reset, &c) */
1034 RPRINTF("error reading from backup\n");
1035 close_stream_fd(s);
1036 return;
1037 }
1038
1039 req[4] = '\0';
1040
1041 if (!strcmp(req, TDREMUS_DONE))
1042 /* checkpoint committed, inform msg_fd */
1043 ctl_respond(s, TDREMUS_DONE);
1044 else {
1045 RPRINTF("received unknown message: %s\n", req);
1046 close_stream_fd(s);
1047 }
1048
1049 return;
1050 }
1051
1052 /* backup functions */
1053 static void remus_server_event(event_id_t id, char mode, void *private);
1054
1055 /* returns the socket that receives write requests */
remus_server_accept(event_id_t id,char mode,void * private)1056 static void remus_server_accept(event_id_t id, char mode, void* private)
1057 {
1058 struct tdremus_state* s = (struct tdremus_state *) private;
1059
1060 int stream_fd;
1061 event_id_t cid;
1062
1063 /* XXX: add address-based black/white list */
1064 if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
1065 RPRINTF("error accepting connection: %d\n", errno);
1066 return;
1067 }
1068
1069 /* TODO: check to see if we are already replicating. if so just close the
1070 * connection (or do something smarter) */
1071 RPRINTF("server accepted connection\n");
1072
1073 /* add tapdisk event for replication stream */
1074 cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0,
1075 remus_server_event, s);
1076
1077 if(cid < 0) {
1078 RPRINTF("error registering connection event handler: %s\n", strerror(errno));
1079 close(stream_fd);
1080 return;
1081 }
1082
1083 /* store replication file descriptor */
1084 s->stream_fd.fd = stream_fd;
1085 s->stream_fd.id = cid;
1086 }
1087
1088 /* returns -2 if EADDRNOTAVAIL */
remus_bind(struct tdremus_state * s)1089 static int remus_bind(struct tdremus_state* s)
1090 {
1091 // struct sockaddr_in sa;
1092 int opt;
1093 int rc = -1;
1094
1095 if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1096 RPRINTF("could not create server socket: %d\n", errno);
1097 return rc;
1098 }
1099 opt = 1;
1100 if (setsockopt(s->server_fd.fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0)
1101 RPRINTF("Error setting REUSEADDR on %d: %d\n", s->server_fd.fd, errno);
1102
1103 if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa)) < 0) {
1104 RPRINTF("could not bind server socket %d to %s:%d: %d %s\n", s->server_fd.fd,
1105 inet_ntoa(s->sa.sin_addr), ntohs(s->sa.sin_port), errno, strerror(errno));
1106 if (errno != EADDRINUSE)
1107 rc = -2;
1108 goto err_sfd;
1109 }
1110 if (listen(s->server_fd.fd, 10)) {
1111 RPRINTF("could not listen on socket: %d\n", errno);
1112 goto err_sfd;
1113 }
1114
1115 /* The socket s now bound to the address and listening so we may now register
1116 * the fd with tapdisk */
1117
1118 if((s->server_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
1119 s->server_fd.fd, 0,
1120 remus_server_accept, s)) < 0) {
1121 RPRINTF("error registering server connection event handler: %s",
1122 strerror(s->server_fd.id));
1123 goto err_sfd;
1124 }
1125
1126 return 0;
1127
1128 err_sfd:
1129 close(s->server_fd.fd);
1130 s->server_fd.fd = -1;
1131
1132 return rc;
1133 }
1134
1135 /* wait for latest checkpoint to be applied */
server_writes_inflight(td_driver_t * driver)1136 static inline int server_writes_inflight(td_driver_t *driver)
1137 {
1138 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1139
1140 if (!s->ramdisk.inflight && !s->ramdisk.prev)
1141 return 0;
1142
1143 return 1;
1144 }
1145
1146 /* Due to block device prefetching this code may be called on the server side
1147 * during normal replication. In this case we must return EBUSY, otherwise the
1148 * domain may be started with stale data.
1149 */
backup_queue_read(td_driver_t * driver,td_request_t treq)1150 void backup_queue_read(td_driver_t *driver, td_request_t treq)
1151 {
1152 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1153 int i;
1154 if(!remus_image)
1155 remus_image = treq.image;
1156
1157 /* check if this read is queued in any currently ongoing flush */
1158 if (ramdisk_read(&s->ramdisk, treq.sec, treq.secs, treq.buf)) {
1159 /* TODO: Add to pending read hash */
1160 td_forward_request(treq);
1161 } else {
1162 /* complete the request */
1163 td_complete_request(treq, 0);
1164 }
1165 }
1166
1167 /* see above */
backup_queue_write(td_driver_t * driver,td_request_t treq)1168 void backup_queue_write(td_driver_t *driver, td_request_t treq)
1169 {
1170 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1171
1172 /* on a server write, we know the domain has failed over. we must change our
1173 * state to unprotected and then have the unprotected queue_write function
1174 * handle the write
1175 */
1176
1177 switch_mode(driver, mode_unprotected);
1178 /* TODO: call the appropriate write function rather than return EBUSY */
1179 td_complete_request(treq, -EBUSY);
1180 }
1181
backup_start(td_driver_t * driver)1182 static int backup_start(td_driver_t *driver)
1183 {
1184 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1185 int fd;
1186
1187 if (ramdisk_start(driver) < 0)
1188 return -1;
1189
1190 tapdisk_remus.td_queue_read = backup_queue_read;
1191 tapdisk_remus.td_queue_write = backup_queue_write;
1192 s->queue_flush = server_flush;
1193 /* TODO set flush function */
1194 return 0;
1195 }
1196
server_do_wreq(td_driver_t * driver)1197 static int server_do_wreq(td_driver_t *driver)
1198 {
1199 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1200 static tdremus_wire_t twreq;
1201 char buf[4096];
1202 int len, rc;
1203
1204 char header[sizeof(uint32_t) + sizeof(uint64_t)];
1205 uint32_t *sectors = (uint32_t *) header;
1206 uint64_t *sector = (uint64_t *) &header[sizeof(uint32_t)];
1207
1208 // RPRINTF("received write request\n");
1209
1210 if (mread(s->stream_fd.fd, header, sizeof(header)) < 0)
1211 goto err;
1212
1213 len = *sectors * driver->info.sector_size;
1214
1215 //RPRINTF("writing %d sectors (%d bytes) starting at %" PRIu64 "\n", *sectors, len,
1216 // *sector);
1217
1218 if (len > sizeof(buf)) {
1219 /* freak out! */
1220 RPRINTF("write request too large: %d/%u\n", len, (unsigned)sizeof(buf));
1221 return -1;
1222 }
1223
1224 if (mread(s->stream_fd.fd, buf, len) < 0)
1225 goto err;
1226
1227 if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0)
1228 goto err;
1229
1230 return 0;
1231
1232 err:
1233 /* should start failover */
1234 RPRINTF("backup write request error\n");
1235 close_stream_fd(s);
1236
1237 return -1;
1238 }
1239
server_do_sreq(td_driver_t * driver)1240 static int server_do_sreq(td_driver_t *driver)
1241 {
1242 /*
1243 RPRINTF("submit request received\n");
1244 */
1245
1246 return 0;
1247 }
1248
1249 /* at this point, the server can start applying the most recent
1250 * ramdisk. */
server_do_creq(td_driver_t * driver)1251 static int server_do_creq(td_driver_t *driver)
1252 {
1253 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1254
1255 // RPRINTF("committing buffer\n");
1256
1257 ramdisk_start_flush(driver);
1258
1259 /* XXX this message should not be sent until flush completes! */
1260 if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4)
1261 return -1;
1262
1263 return 0;
1264 }
1265
1266
1267 /* called when data is pending in s->rfd */
remus_server_event(event_id_t id,char mode,void * private)1268 static void remus_server_event(event_id_t id, char mode, void *private)
1269 {
1270 struct tdremus_state *s = (struct tdremus_state *)private;
1271 td_driver_t *driver = s->tdremus_driver;
1272 char req[5];
1273
1274 // RPRINTF("replication data waiting\n");
1275
1276 /* TODO: add a get_connection_by_event_id() function.
1277 * for now we can assume that the fd is s->stream_fd */
1278
1279 if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
1280 RPRINTF("error reading server event, activating backup\n");
1281 switch_mode(driver, mode_unprotected);
1282 return;
1283 }
1284
1285 req[4] = '\0';
1286
1287 if (!strcmp(req, TDREMUS_WRITE))
1288 server_do_wreq(driver);
1289 else if (!strcmp(req, TDREMUS_SUBMIT))
1290 server_do_sreq(driver);
1291 else if (!strcmp(req, TDREMUS_COMMIT))
1292 server_do_creq(driver);
1293 else
1294 RPRINTF("unknown request received: %s\n", req);
1295
1296 return;
1297
1298 }
1299
1300 /* unprotected */
1301
unprotected_queue_read(td_driver_t * driver,td_request_t treq)1302 void unprotected_queue_read(td_driver_t *driver, td_request_t treq)
1303 {
1304 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1305
1306 /* wait for previous ramdisk to flush before servicing reads */
1307 if (server_writes_inflight(driver)) {
1308 /* for now lets just return EBUSY.
1309 * if there are any left-over requests in prev,
1310 * kick em again.
1311 */
1312 if(!s->ramdisk.inflight) /* nothing in inprogress */
1313 ramdisk_flush(driver, s);
1314
1315 td_complete_request(treq, -EBUSY);
1316 }
1317 else {
1318 /* here we just pass reads through */
1319 td_forward_request(treq);
1320 }
1321 }
1322
1323 /* For a recoverable remus solution we need to log unprotected writes here */
unprotected_queue_write(td_driver_t * driver,td_request_t treq)1324 void unprotected_queue_write(td_driver_t *driver, td_request_t treq)
1325 {
1326 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1327
1328 /* wait for previous ramdisk to flush */
1329 if (server_writes_inflight(driver)) {
1330 RPRINTF("queue_write: waiting for queue to drain");
1331 if(!s->ramdisk.inflight) /* nothing in inprogress. Kick prev */
1332 ramdisk_flush(driver, s);
1333 td_complete_request(treq, -EBUSY);
1334 }
1335 else {
1336 // RPRINTF("servicing write request on backup\n");
1337 /* NOTE: DRBD style bitmap tracking could go here */
1338 td_forward_request(treq);
1339 }
1340 }
1341
unprotected_start(td_driver_t * driver)1342 static int unprotected_start(td_driver_t *driver)
1343 {
1344 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1345
1346 RPRINTF("failure detected, activating passthrough\n");
1347
1348 /* close the server socket */
1349 close_stream_fd(s);
1350
1351 /* unregister the replication stream */
1352 tapdisk_server_unregister_event(s->server_fd.id);
1353
1354 /* close the replication stream */
1355 close(s->server_fd.fd);
1356 s->server_fd.fd = -1;
1357
1358 /* install the unprotected read/write handlers */
1359 tapdisk_remus.td_queue_read = unprotected_queue_read;
1360 tapdisk_remus.td_queue_write = unprotected_queue_write;
1361
1362 return 0;
1363 }
1364
1365
1366 /* control */
1367
resolve_address(const char * addr,struct in_addr * ia)1368 static inline int resolve_address(const char* addr, struct in_addr* ia)
1369 {
1370 struct hostent* he;
1371 uint32_t ip;
1372
1373 if (!(he = gethostbyname(addr))) {
1374 RPRINTF("error resolving %s: %d\n", addr, h_errno);
1375 return -1;
1376 }
1377
1378 if (!he->h_addr_list[0]) {
1379 RPRINTF("no address found for %s\n", addr);
1380 return -1;
1381 }
1382
1383 /* network byte order */
1384 ip = *((uint32_t**)he->h_addr_list)[0];
1385 ia->s_addr = ip;
1386
1387 return 0;
1388 }
1389
get_args(td_driver_t * driver,const char * name)1390 static int get_args(td_driver_t *driver, const char* name)
1391 {
1392 struct tdremus_state *state = (struct tdremus_state *)driver->data;
1393 char* host;
1394 char* port;
1395 // char* driver_str;
1396 // char* parent;
1397 // int type;
1398 // char* path;
1399 // unsigned long ulport;
1400 // int i;
1401 // struct sockaddr_in server_addr_in;
1402
1403 int gai_status;
1404 int valid_addr;
1405 struct addrinfo gai_hints;
1406 struct addrinfo *servinfo, *servinfo_itr;
1407
1408 memset(&gai_hints, 0, sizeof gai_hints);
1409 gai_hints.ai_family = AF_UNSPEC;
1410 gai_hints.ai_socktype = SOCK_STREAM;
1411
1412 port = strchr(name, ':');
1413 if (!port) {
1414 RPRINTF("missing host in %s\n", name);
1415 return -ENOENT;
1416 }
1417 if (!(host = strndup(name, port - name))) {
1418 RPRINTF("unable to allocate host\n");
1419 return -ENOMEM;
1420 }
1421 port++;
1422
1423 if ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo)) != 0) {
1424 RPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
1425 return -ENOENT;
1426 }
1427
1428 /* TODO: do something smarter here */
1429 valid_addr = 0;
1430 for(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr = servinfo_itr->ai_next) {
1431 void *addr;
1432 char *ipver;
1433
1434 if (servinfo_itr->ai_family == AF_INET) {
1435 valid_addr = 1;
1436 memset(&state->sa, 0, sizeof(state->sa));
1437 state->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr;
1438 break;
1439 }
1440 }
1441 freeaddrinfo(servinfo);
1442
1443 if (!valid_addr)
1444 return -ENOENT;
1445
1446 RPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
1447
1448 return 0;
1449 }
1450
switch_mode(td_driver_t * driver,enum tdremus_mode mode)1451 static int switch_mode(td_driver_t *driver, enum tdremus_mode mode)
1452 {
1453 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1454 int rc;
1455
1456 if (mode == s->mode)
1457 return 0;
1458
1459 if (s->queue_flush)
1460 if ((rc = s->queue_flush(driver)) < 0) {
1461 // fall back to unprotected mode on error
1462 RPRINTF("switch_mode: error flushing queue (old: %d, new: %d)", s->mode, mode);
1463 mode = mode_unprotected;
1464 }
1465
1466 if (mode == mode_unprotected)
1467 rc = unprotected_start(driver);
1468 else if (mode == mode_primary)
1469 rc = primary_start(driver);
1470 else if (mode == mode_backup)
1471 rc = backup_start(driver);
1472 else {
1473 RPRINTF("unknown mode requested: %d\n", mode);
1474 rc = -1;
1475 }
1476
1477 if (!rc)
1478 s->mode = mode;
1479
1480 return rc;
1481 }
1482
ctl_request(event_id_t id,char mode,void * private)1483 static void ctl_request(event_id_t id, char mode, void *private)
1484 {
1485 struct tdremus_state *s = (struct tdremus_state *)private;
1486 td_driver_t *driver = s->tdremus_driver;
1487 char msg[80];
1488 int rc;
1489
1490 // RPRINTF("data waiting on control fifo\n");
1491
1492 if (!(rc = read(s->ctl_fd.fd, msg, sizeof(msg) - 1 /* append nul */))) {
1493 RPRINTF("0-byte read received, reopening FIFO\n");
1494 /*TODO: we may have to unregister/re-register with tapdisk_server */
1495 close(s->ctl_fd.fd);
1496 RPRINTF("FIFO closed\n");
1497 if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
1498 RPRINTF("error reopening FIFO: %d\n", errno);
1499 }
1500 return;
1501 }
1502
1503 if (rc < 0) {
1504 RPRINTF("error reading from FIFO: %d\n", errno);
1505 return;
1506 }
1507
1508 /* TODO: need to get driver somehow */
1509 msg[rc] = '\0';
1510 if (!strncmp(msg, "flush", 5)) {
1511 if (s->queue_flush)
1512 if ((rc = s->queue_flush(driver))) {
1513 RPRINTF("error passing flush request to backup");
1514 ctl_respond(s, TDREMUS_FAIL);
1515 }
1516 } else {
1517 RPRINTF("unknown command: %s\n", msg);
1518 }
1519 }
1520
ctl_respond(struct tdremus_state * s,const char * response)1521 static int ctl_respond(struct tdremus_state *s, const char *response)
1522 {
1523 int rc;
1524
1525 if ((rc = write(s->msg_fd.fd, response, strlen(response))) < 0) {
1526 RPRINTF("error writing notification: %d\n", errno);
1527 close(s->msg_fd.fd);
1528 if ((s->msg_fd.fd = open(s->msg_path, O_RDWR)) < 0)
1529 RPRINTF("error reopening FIFO: %d\n", errno);
1530 }
1531
1532 return rc;
1533 }
1534
1535 /* must be called after the underlying driver has been initialized */
ctl_open(td_driver_t * driver,const char * name)1536 static int ctl_open(td_driver_t *driver, const char* name)
1537 {
1538 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1539 int i, l;
1540
1541 /* first we must ensure that BLKTAP_CTRL_DIR exists */
1542 if (mkdir(BLKTAP_CTRL_DIR, 0755) && errno != EEXIST)
1543 {
1544 DPRINTF("error creating directory %s: %d\n", BLKTAP_CTRL_DIR, errno);
1545 return -1;
1546 }
1547
1548 /* use the device name to create the control fifo path */
1549 if (asprintf(&s->ctl_path, BLKTAP_CTRL_DIR "/remus_%s", name) < 0)
1550 return -1;
1551 /* scrub fifo pathname */
1552 for (i = strlen(BLKTAP_CTRL_DIR) + 1, l = strlen(s->ctl_path); i < l; i++) {
1553 if (strchr(":/", s->ctl_path[i]))
1554 s->ctl_path[i] = '_';
1555 }
1556 if (asprintf(&s->msg_path, "%s.msg", s->ctl_path) < 0)
1557 goto err_ctlfifo;
1558
1559 if (mkfifo(s->ctl_path, S_IRWXU|S_IRWXG|S_IRWXO) && errno != EEXIST) {
1560 RPRINTF("error creating control FIFO %s: %d\n", s->ctl_path, errno);
1561 goto err_msgfifo;
1562 }
1563
1564 if (mkfifo(s->msg_path, S_IRWXU|S_IRWXG|S_IRWXO) && errno != EEXIST) {
1565 RPRINTF("error creating message FIFO %s: %d\n", s->msg_path, errno);
1566 goto err_msgfifo;
1567 }
1568
1569 /* RDWR so that fd doesn't block select when no writer is present */
1570 if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
1571 RPRINTF("error opening control FIFO %s: %d\n", s->ctl_path, errno);
1572 goto err_msgfifo;
1573 }
1574
1575 if ((s->msg_fd.fd = open(s->msg_path, O_RDWR)) < 0) {
1576 RPRINTF("error opening message FIFO %s: %d\n", s->msg_path, errno);
1577 goto err_openctlfifo;
1578 }
1579
1580 RPRINTF("control FIFO %s\n", s->ctl_path);
1581 RPRINTF("message FIFO %s\n", s->msg_path);
1582
1583 return 0;
1584
1585 err_openctlfifo:
1586 close(s->ctl_fd.fd);
1587 err_msgfifo:
1588 free(s->msg_path);
1589 s->msg_path = NULL;
1590 err_ctlfifo:
1591 free(s->ctl_path);
1592 s->ctl_path = NULL;
1593 return -1;
1594 }
1595
ctl_close(td_driver_t * driver)1596 static void ctl_close(td_driver_t *driver)
1597 {
1598 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1599
1600 /* TODO: close *all* connections */
1601
1602 if(s->ctl_fd.fd)
1603 close(s->ctl_fd.fd);
1604
1605 if (s->ctl_path) {
1606 unlink(s->ctl_path);
1607 free(s->ctl_path);
1608 s->ctl_path = NULL;
1609 }
1610 if (s->msg_path) {
1611 unlink(s->msg_path);
1612 free(s->msg_path);
1613 s->msg_path = NULL;
1614 }
1615 }
1616
ctl_register(struct tdremus_state * s)1617 static int ctl_register(struct tdremus_state *s)
1618 {
1619 RPRINTF("registering ctl fifo\n");
1620
1621 /* register ctl fd */
1622 s->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s);
1623
1624 if (s->ctl_fd.id < 0) {
1625 RPRINTF("error registering ctrl FIFO %s: %d\n", s->ctl_path, s->ctl_fd.id);
1626 return -1;
1627 }
1628
1629 return 0;
1630 }
1631
1632 /* interface */
1633
tdremus_open(td_driver_t * driver,const char * name,td_flag_t flags)1634 static int tdremus_open(td_driver_t *driver, const char *name,
1635 td_flag_t flags)
1636 {
1637 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1638 int rc;
1639
1640 RPRINTF("opening %s\n", name);
1641
1642 /* first we need to get the underlying vbd for this driver stack. To do so we
1643 * need to know the vbd's id. Fortunately, for tapdisk2 this is hard-coded as
1644 * 0 (see tapdisk2.c)
1645 */
1646 device_vbd = tapdisk_server_get_vbd(0);
1647
1648 memset(s, 0, sizeof(*s));
1649 s->server_fd.fd = -1;
1650 s->stream_fd.fd = -1;
1651 s->ctl_fd.fd = -1;
1652 s->msg_fd.fd = -1;
1653
1654 /* TODO: this is only needed so that the server can send writes down
1655 * the driver stack from the stream_fd event handler */
1656 s->tdremus_driver = driver;
1657
1658 /* parse name to get info etc */
1659 if ((rc = get_args(driver, name)))
1660 return rc;
1661
1662 if ((rc = ctl_open(driver, name))) {
1663 RPRINTF("error setting up control channel\n");
1664 free(s->driver_data);
1665 return rc;
1666 }
1667
1668 if ((rc = ctl_register(s))) {
1669 RPRINTF("error registering control channel\n");
1670 free(s->driver_data);
1671 return rc;
1672 }
1673
1674 if (!(rc = remus_bind(s)))
1675 rc = switch_mode(driver, mode_backup);
1676 else if (rc == -2)
1677 rc = switch_mode(driver, mode_primary);
1678
1679 if (!rc)
1680 return 0;
1681
1682 tdremus_close(driver);
1683 return -EIO;
1684 }
1685
tdremus_close(td_driver_t * driver)1686 static int tdremus_close(td_driver_t *driver)
1687 {
1688 struct tdremus_state *s = (struct tdremus_state *)driver->data;
1689
1690 RPRINTF("closing\n");
1691 if (s->ramdisk.inprogress)
1692 hashtable_destroy(s->ramdisk.inprogress, 0);
1693
1694 if (s->driver_data) {
1695 free(s->driver_data);
1696 s->driver_data = NULL;
1697 }
1698 if (s->server_fd.fd >= 0) {
1699 close(s->server_fd.fd);
1700 s->server_fd.fd = -1;
1701 }
1702 if (s->stream_fd.fd >= 0)
1703 close_stream_fd(s);
1704
1705 ctl_close(driver);
1706
1707 return 0;
1708 }
1709
tdremus_get_parent_id(td_driver_t * driver,td_disk_id_t * id)1710 static int tdremus_get_parent_id(td_driver_t *driver, td_disk_id_t *id)
1711 {
1712 /* we shouldn't have a parent... for now */
1713 return -EINVAL;
1714 }
1715
tdremus_validate_parent(td_driver_t * driver,td_driver_t * pdriver,td_flag_t flags)1716 static int tdremus_validate_parent(td_driver_t *driver,
1717 td_driver_t *pdriver, td_flag_t flags)
1718 {
1719 return 0;
1720 }
1721
1722 struct tap_disk tapdisk_remus = {
1723 .disk_type = "tapdisk_remus",
1724 .private_data_size = sizeof(struct tdremus_state),
1725 .td_open = tdremus_open,
1726 .td_queue_read = unprotected_queue_read,
1727 .td_queue_write = unprotected_queue_write,
1728 .td_close = tdremus_close,
1729 .td_get_parent_id = tdremus_get_parent_id,
1730 .td_validate_parent = tdremus_validate_parent,
1731 .td_debug = NULL,
1732 };
1733