1 /* block-remus.c
2  *
3  * This disk sends all writes to a backup via a network interface before
4  * passing them to an underlying device.
5  * The backup is a bit more complicated:
6  *  1. It applies all incoming writes to a ramdisk.
7  *  2. When a checkpoint request arrives, it moves the ramdisk to
8  *     a committing state and uses a new ramdisk for subsequent writes.
9  *     It also acknowledges the request, to let the sender know it can
10  *     release output.
11  *  3. The ramdisk flushes its contents to the underlying driver.
12  *  4. At failover, the backup waits for the in-flight ramdisk (if any) to
13  *     drain before letting the domain be activated.
14  *
15  * The driver determines whether it is the client or server by attempting
16  * to bind to the replication address. If the address is not local,
17  * the driver acts as client.
18  *
19  * The following messages are defined for the replication stream:
20  * 1. write request
21  *    "wreq"      4
22  *    num_sectors 4
23  *    sector      8
24  *    buffer      (num_sectors * sector_size)
25  * 2. submit request (may be used as a barrier
26  *    "sreq"      4
27  * 3. commit request
28  *    "creq"      4
29  * After a commit request, the client must wait for a competion message:
30  * 4. completion
31  *    "done"      4
32  */
33 
34 /* due to architectural choices in tapdisk, block-buffer is forced to
35  * reimplement some code which is meant to be private */
36 #include "tapdisk.h"
37 #include "tapdisk-server.h"
38 #include "tapdisk-driver.h"
39 #include "tapdisk-interface.h"
40 #include "hashtable.h"
41 #include "hashtable_itr.h"
42 #include "hashtable_utility.h"
43 
44 #include <errno.h>
45 #include <inttypes.h>
46 #include <fcntl.h>
47 #include <stdio.h>
48 #include <stdlib.h>
49 #include <string.h>
50 #include <sys/time.h>
51 #include <sys/types.h>
52 #include <sys/socket.h>
53 #include <netdb.h>
54 #include <netinet/in.h>
55 #include <arpa/inet.h>
56 #include <sys/param.h>
57 #include <unistd.h>
58 #include <sys/stat.h>
59 
60 /* timeout for reads and writes in ms */
61 #define HEARTBEAT_MS 1000
62 #define RAMDISK_HASHSIZE 128
63 
64 /* connect retry timeout (seconds) */
65 #define REMUS_CONNRETRY_TIMEOUT 10
66 
67 #define RPRINTF(_f, _a...) syslog (LOG_DEBUG, "remus: " _f, ## _a)
68 
69 enum tdremus_mode {
70 	mode_invalid = 0,
71 	mode_unprotected,
72 	mode_primary,
73 	mode_backup
74 };
75 
76 struct tdremus_req {
77 	uint64_t sector;
78 	int nb_sectors;
79 	char buf[4096];
80 };
81 
82 struct req_ring {
83 	/* waste one slot to distinguish between empty and full */
84 	struct tdremus_req requests[MAX_REQUESTS * 2 + 1];
85 	unsigned int head;
86 	unsigned int tail;
87 };
88 
89 /* TODO: This isn't very pretty, but to properly generate our own treqs (needed
90  * by the backup) we need to know our td_vbt_t and td_image_t (blktap2
91  * internals). As a proper fix, we should consider extending the tapdisk
92  * interface with a td_create_request() function, or something similar.
93  *
94  * For now, we just grab the vbd in the td_open() command, and the td_image_t
95  * from the first read request.
96  */
97 td_vbd_t *device_vbd = NULL;
98 td_image_t *remus_image = NULL;
99 struct tap_disk tapdisk_remus;
100 
101 struct ramdisk {
102 	size_t sector_size;
103 	struct hashtable* h;
104 	/* when a ramdisk is flushed, h is given a new empty hash for writes
105 	 * while the old ramdisk (prev) is drained asynchronously.
106 	 */
107 	struct hashtable* prev;
108 	/* count of outstanding requests to the base driver */
109 	size_t inflight;
110 	/* prev holds the requests to be flushed, while inprogress holds
111 	 * requests being flushed. When requests complete, they are removed
112 	 * from inprogress.
113 	 * Whenever a new flush is merged with ongoing flush (i.e, prev),
114 	 * we have to make sure that none of the new requests overlap with
115 	 * ones in "inprogress". If it does, keep it back in prev and dont issue
116 	 * IO until the current one finishes. If we allow this IO to proceed,
117 	 * we might end up with two "overlapping" requests in the disk's queue and
118 	 * the disk may not offer any guarantee on which one is written first.
119 	 * IOW, make sure we dont create a write-after-write time ordering constraint.
120 	 *
121 	 */
122 	struct hashtable* inprogress;
123 };
124 
125 /* the ramdisk intercepts the original callback for reads and writes.
126  * This holds the original data. */
127 /* Might be worth making this a static array in struct ramdisk to avoid
128  * a malloc per request */
129 
130 struct tdremus_state;
131 
132 struct ramdisk_cbdata {
133 	td_callback_t cb;
134 	void* private;
135 	char* buf;
136 	struct tdremus_state* state;
137 };
138 
139 struct ramdisk_write_cbdata {
140 	struct tdremus_state* state;
141 	char* buf;
142 };
143 
144 typedef void (*queue_rw_t) (td_driver_t *driver, td_request_t treq);
145 
146 /* poll_fd type for blktap2 fd system. taken from block_log.c */
147 typedef struct poll_fd {
148 	int        fd;
149 	event_id_t id;
150 } poll_fd_t;
151 
152 struct tdremus_state {
153 //  struct tap_disk* driver;
154 	void* driver_data;
155 
156   /* XXX: this is needed so that the server can perform operations on
157    * the driver from the stream_fd event handler. fix this. */
158 	td_driver_t *tdremus_driver;
159 
160 	/* TODO: we may wish to replace these two FIFOs with a unix socket */
161 	char*     ctl_path; /* receive flush instruction here */
162 	poll_fd_t ctl_fd;     /* io_fd slot for control FIFO */
163 	char*     msg_path; /* output completion message here */
164 	poll_fd_t msg_fd;
165 
166   /* replication host */
167 	struct sockaddr_in sa;
168 	poll_fd_t server_fd;    /* server listen port */
169 	poll_fd_t stream_fd;     /* replication channel */
170 
171 	/* queue write requests, batch-replicate at submit */
172 	struct req_ring write_ring;
173 
174 	/* ramdisk data*/
175 	struct ramdisk ramdisk;
176 
177 	/* mode methods */
178 	enum tdremus_mode mode;
179 	int (*queue_flush)(td_driver_t *driver);
180 };
181 
182 typedef struct tdremus_wire {
183 	uint32_t op;
184 	uint64_t id;
185 	uint64_t sec;
186 	uint32_t secs;
187 } tdremus_wire_t;
188 
189 #define TDREMUS_READ "rreq"
190 #define TDREMUS_WRITE "wreq"
191 #define TDREMUS_SUBMIT "sreq"
192 #define TDREMUS_COMMIT "creq"
193 #define TDREMUS_DONE "done"
194 #define TDREMUS_FAIL "fail"
195 
196 /* primary read/write functions */
197 static void primary_queue_read(td_driver_t *driver, td_request_t treq);
198 static void primary_queue_write(td_driver_t *driver, td_request_t treq);
199 
200 /* backup read/write functions */
201 static void backup_queue_read(td_driver_t *driver, td_request_t treq);
202 static void backup_queue_write(td_driver_t *driver, td_request_t treq);
203 
204 /* unpritected read/write functions */
205 static void unprotected_queue_read(td_driver_t *driver, td_request_t treq);
206 static void unprotected_queue_write(td_driver_t *driver, td_request_t treq);
207 
208 static int tdremus_close(td_driver_t *driver);
209 
210 static int switch_mode(td_driver_t *driver, enum tdremus_mode mode);
211 static int ctl_respond(struct tdremus_state *s, const char *response);
212 
213 /* ring functions */
ring_next(struct req_ring * ring,unsigned int pos)214 static inline unsigned int ring_next(struct req_ring* ring, unsigned int pos)
215 {
216 	if (++pos >= MAX_REQUESTS * 2 + 1)
217 		return 0;
218 
219 	return pos;
220 }
221 
ring_isempty(struct req_ring * ring)222 static inline int ring_isempty(struct req_ring* ring)
223 {
224 	return ring->head == ring->tail;
225 }
226 
ring_isfull(struct req_ring * ring)227 static inline int ring_isfull(struct req_ring* ring)
228 {
229 	return ring_next(ring, ring->tail) == ring->head;
230 }
231 /* Prototype declarations */
232 static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s);
233 
234 /* functions to create and sumbit treq's */
235 
236 static void
replicated_write_callback(td_request_t treq,int err)237 replicated_write_callback(td_request_t treq, int err)
238 {
239 	struct tdremus_state *s = (struct tdremus_state *) treq.cb_data;
240 	td_vbd_request_t *vreq;
241 	int i;
242 	uint64_t start;
243 	vreq = (td_vbd_request_t *) treq.private;
244 
245 	/* the write failed for now, lets panic. this is very bad */
246 	if (err) {
247 		RPRINTF("ramdisk write failed, disk image is not consistent\n");
248 		exit(-1);
249 	}
250 
251 	/* The write succeeded. let's pull the vreq off whatever request list
252 	 * it is on and free() it */
253 	list_del(&vreq->next);
254 	free(vreq);
255 
256 	s->ramdisk.inflight--;
257 	start = treq.sec;
258 	for (i = 0; i < treq.secs; i++) {
259 		hashtable_remove(s->ramdisk.inprogress, &start);
260 		start++;
261 	}
262 	free(treq.buf);
263 
264 	if (!s->ramdisk.inflight && !s->ramdisk.prev) {
265 		/* TODO: the ramdisk has been flushed */
266 	}
267 }
268 
269 static inline int
create_write_request(struct tdremus_state * state,td_sector_t sec,int secs,char * buf)270 create_write_request(struct tdremus_state *state, td_sector_t sec, int secs, char *buf)
271 {
272 	td_request_t treq;
273 	td_vbd_request_t *vreq;
274 
275 	treq.op      = TD_OP_WRITE;
276 	treq.buf     = buf;
277 	treq.sec     = sec;
278 	treq.secs    = secs;
279 	treq.image   = remus_image;
280 	treq.cb      = replicated_write_callback;
281 	treq.cb_data = state;
282 	treq.id      = 0;
283 	treq.sidx    = 0;
284 
285 	vreq         = calloc(1, sizeof(td_vbd_request_t));
286 	treq.private = vreq;
287 
288 	if(!vreq)
289 		return -1;
290 
291 	vreq->submitting = 1;
292 	INIT_LIST_HEAD(&vreq->next);
293 	tapdisk_vbd_move_request(treq.private, &device_vbd->pending_requests);
294 
295 	/* TODO:
296 	 * we should probably leave it up to the caller to forward the request */
297 	td_forward_request(treq);
298 
299 	vreq->submitting--;
300 
301 	return 0;
302 }
303 
304 
305 /* http://www.concentric.net/~Ttwang/tech/inthash.htm */
uint64_hash(void * k)306 static unsigned int uint64_hash(void* k)
307 {
308 	uint64_t key = *(uint64_t*)k;
309 
310 	key = (~key) + (key << 18);
311 	key = key ^ (key >> 31);
312 	key = key * 21;
313 	key = key ^ (key >> 11);
314 	key = key + (key << 6);
315 	key = key ^ (key >> 22);
316 
317 	return (unsigned int)key;
318 }
319 
rd_hash_equal(void * k1,void * k2)320 static int rd_hash_equal(void* k1, void* k2)
321 {
322 	uint64_t key1, key2;
323 
324 	key1 = *(uint64_t*)k1;
325 	key2 = *(uint64_t*)k2;
326 
327 	return key1 == key2;
328 }
329 
ramdisk_read(struct ramdisk * ramdisk,uint64_t sector,int nb_sectors,char * buf)330 static int ramdisk_read(struct ramdisk* ramdisk, uint64_t sector,
331 			int nb_sectors, char* buf)
332 {
333 	int i;
334 	char* v;
335 	uint64_t key;
336 
337 	for (i = 0; i < nb_sectors; i++) {
338 		key = sector + i;
339 		/* check whether it is queued in a previous flush request */
340 		if (!(ramdisk->prev && (v = hashtable_search(ramdisk->prev, &key)))) {
341 			/* check whether it is an ongoing flush */
342 			if (!(ramdisk->inprogress && (v = hashtable_search(ramdisk->inprogress, &key))))
343 				return -1;
344 		}
345 		memcpy(buf + i * ramdisk->sector_size, v, ramdisk->sector_size);
346 	}
347 
348 	return 0;
349 }
350 
ramdisk_write_hash(struct hashtable * h,uint64_t sector,char * buf,size_t len)351 static int ramdisk_write_hash(struct hashtable* h, uint64_t sector, char* buf,
352 			      size_t len)
353 {
354 	char* v;
355 	uint64_t* key;
356 
357 	if ((v = hashtable_search(h, &sector))) {
358 		memcpy(v, buf, len);
359 		return 0;
360 	}
361 
362 	if (!(v = malloc(len))) {
363 		DPRINTF("ramdisk_write_hash: malloc failed\n");
364 		return -1;
365 	}
366 	memcpy(v, buf, len);
367 	if (!(key = malloc(sizeof(*key)))) {
368 		DPRINTF("ramdisk_write_hash: error allocating key\n");
369 		free(v);
370 		return -1;
371 	}
372 	*key = sector;
373 	if (!hashtable_insert(h, key, v)) {
374 		DPRINTF("ramdisk_write_hash failed on sector %" PRIu64 "\n", sector);
375 		free(key);
376 		free(v);
377 		return -1;
378 	}
379 
380 	return 0;
381 }
382 
ramdisk_write(struct ramdisk * ramdisk,uint64_t sector,int nb_sectors,char * buf)383 static inline int ramdisk_write(struct ramdisk* ramdisk, uint64_t sector,
384 				int nb_sectors, char* buf)
385 {
386 	int i, rc;
387 
388 	for (i = 0; i < nb_sectors; i++) {
389 		rc = ramdisk_write_hash(ramdisk->h, sector + i,
390 					buf + i * ramdisk->sector_size,
391 					ramdisk->sector_size);
392 		if (rc)
393 			return rc;
394 	}
395 
396 	return 0;
397 }
398 
uint64_compare(const void * k1,const void * k2)399 static int uint64_compare(const void* k1, const void* k2)
400 {
401 	uint64_t u1 = *(uint64_t*)k1;
402 	uint64_t u2 = *(uint64_t*)k2;
403 
404 	/* u1 - u2 is unsigned */
405 	return u1 < u2 ? -1 : u1 > u2 ? 1 : 0;
406 }
407 
408 /* set psectors to an array of the sector numbers in the hash, returning
409  * the number of entries (or -1 on error) */
ramdisk_get_sectors(struct hashtable * h,uint64_t ** psectors)410 static int ramdisk_get_sectors(struct hashtable* h, uint64_t** psectors)
411 {
412 	struct hashtable_itr* itr;
413 	uint64_t* sectors;
414 	int count;
415 
416 	if (!(count = hashtable_count(h)))
417 		return 0;
418 
419 	if (!(*psectors = malloc(count * sizeof(uint64_t)))) {
420 		DPRINTF("ramdisk_get_sectors: error allocating sector map\n");
421 		return -1;
422 	}
423 	sectors = *psectors;
424 
425 	itr = hashtable_iterator(h);
426 	count = 0;
427 	do {
428 		sectors[count++] = *(uint64_t*)hashtable_iterator_key(itr);
429 	} while (hashtable_iterator_advance(itr));
430 	free(itr);
431 
432 	return count;
433 }
434 
435 /*
436   return -1 for OOM
437   return -2 for merge lookup failure
438   return -3 for WAW race
439   return 0 on success.
440 */
merge_requests(struct ramdisk * ramdisk,uint64_t start,size_t count,char ** mergedbuf)441 static int merge_requests(struct ramdisk* ramdisk, uint64_t start,
442 			size_t count, char **mergedbuf)
443 {
444 	char* buf;
445 	char* sector;
446 	int i;
447 	uint64_t *key;
448 	int rc = 0;
449 
450 	if (!(buf = valloc(count * ramdisk->sector_size))) {
451 		DPRINTF("merge_request: allocation failed\n");
452 		return -1;
453 	}
454 
455 	for (i = 0; i < count; i++) {
456 		if (!(sector = hashtable_search(ramdisk->prev, &start))) {
457 			DPRINTF("merge_request: lookup failed on %"PRIu64"\n", start);
458 			free(buf);
459 			rc = -2;
460 			goto fail;
461 		}
462 
463 		/* Check inprogress requests to avoid waw non-determinism */
464 		if (hashtable_search(ramdisk->inprogress, &start)) {
465 			DPRINTF("merge_request: WAR RACE on %"PRIu64"\n", start);
466 			free(buf);
467 			rc = -3;
468 			goto fail;
469 		}
470 		/* Insert req into inprogress (brief period of duplication of hash entries until
471 		 * they are removed from prev. Read tracking would not be reading wrong entries)
472 		 */
473 		if (!(key = malloc(sizeof(*key)))) {
474 			DPRINTF("%s: error allocating key\n", __FUNCTION__);
475 			free(buf);
476 			rc = -1;
477 			goto fail;
478 		}
479 		*key = start;
480 		if (!hashtable_insert(ramdisk->inprogress, key, NULL)) {
481 			DPRINTF("%s failed to insert sector %" PRIu64 " into inprogress hash\n",
482 				__FUNCTION__, start);
483 			free(key);
484 			free(buf);
485 			rc = -1;
486 			goto fail;
487 		}
488 		memcpy(buf + i * ramdisk->sector_size, sector, ramdisk->sector_size);
489 		start++;
490 	}
491 
492 	*mergedbuf = buf;
493 	return 0;
494 fail:
495 	for (start--; i >0; i--, start--)
496 		hashtable_remove(ramdisk->inprogress, &start);
497 	return rc;
498 }
499 
500 /* The underlying driver may not handle having the whole ramdisk queued at
501  * once. We queue what we can and let the callbacks attempt to queue more. */
502 /* NOTE: may be called from callback, while dd->private still belongs to
503  * the underlying driver */
ramdisk_flush(td_driver_t * driver,struct tdremus_state * s)504 static int ramdisk_flush(td_driver_t *driver, struct tdremus_state* s)
505 {
506 	uint64_t* sectors;
507 	char* buf = NULL;
508 	uint64_t base, batchlen;
509 	int i, j, count = 0;
510 
511 	// RPRINTF("ramdisk flush\n");
512 
513 	if ((count = ramdisk_get_sectors(s->ramdisk.prev, &sectors)) <= 0)
514 		return count;
515 
516 	/* Create the inprogress table if empty */
517 	if (!s->ramdisk.inprogress)
518 		s->ramdisk.inprogress = create_hashtable(RAMDISK_HASHSIZE,
519 							uint64_hash,
520 							rd_hash_equal);
521 
522 	/*
523 	  RPRINTF("ramdisk: flushing %d sectors\n", count);
524 	*/
525 
526 	/* sort and merge sectors to improve disk performance */
527 	qsort(sectors, count, sizeof(*sectors), uint64_compare);
528 
529 	for (i = 0; i < count;) {
530 		base = sectors[i++];
531 		while (i < count && sectors[i] == sectors[i-1] + 1)
532 			i++;
533 		batchlen = sectors[i-1] - base + 1;
534 
535 		j = merge_requests(&s->ramdisk, base, batchlen, &buf);
536 
537 		if (j) {
538 			RPRINTF("ramdisk_flush: merge_requests failed:%s\n",
539 				j == -1? "OOM": (j==-2? "missing sector" : "WAW race"));
540 			if (j == -3) continue;
541 			free(sectors);
542 			return -1;
543 		}
544 
545 		/* NOTE: create_write_request() creates a treq AND forwards it down
546 		 * the driver chain */
547 		// RPRINTF("forwarding write request at %" PRIu64 ", length: %" PRIu64 "\n", base, batchlen);
548 		create_write_request(s, base, batchlen, buf);
549 		//RPRINTF("write request at %" PRIu64 ", length: %" PRIu64 " forwarded\n", base, batchlen);
550 
551 		s->ramdisk.inflight++;
552 
553 		for (j = 0; j < batchlen; j++) {
554 			buf = hashtable_search(s->ramdisk.prev, &base);
555 			free(buf);
556 			hashtable_remove(s->ramdisk.prev, &base);
557 			base++;
558 		}
559 	}
560 
561 	if (!hashtable_count(s->ramdisk.prev)) {
562 		/* everything is in flight */
563 		hashtable_destroy(s->ramdisk.prev, 0);
564 		s->ramdisk.prev = NULL;
565 	}
566 
567 	free(sectors);
568 
569 	// RPRINTF("ramdisk flush done\n");
570 	return 0;
571 }
572 
573 /* flush ramdisk contents to disk */
ramdisk_start_flush(td_driver_t * driver)574 static int ramdisk_start_flush(td_driver_t *driver)
575 {
576 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
577 	uint64_t* key;
578 	char* buf;
579 	int rc = 0;
580 	int i, j, count, batchlen;
581 	uint64_t* sectors;
582 
583 	if (!hashtable_count(s->ramdisk.h)) {
584 		/*
585 		  RPRINTF("Nothing to flush\n");
586 		*/
587 		return 0;
588 	}
589 
590 	if (s->ramdisk.prev) {
591 		/* a flush request issued while a previous flush is still in progress
592 		 * will merge with the previous request. If you want the previous
593 		 * request to be consistent, wait for it to complete. */
594 		if ((count = ramdisk_get_sectors(s->ramdisk.h, &sectors)) < 0)
595 			return count;
596 
597 		for (i = 0; i < count; i++) {
598 			buf = hashtable_search(s->ramdisk.h, sectors + i);
599 			ramdisk_write_hash(s->ramdisk.prev, sectors[i], buf,
600 					   s->ramdisk.sector_size);
601 		}
602 		free(sectors);
603 
604 		hashtable_destroy (s->ramdisk.h, 0);
605 	} else
606 		s->ramdisk.prev = s->ramdisk.h;
607 
608 	/* We create a new hashtable so that new writes can be performed before
609 	 * the old hashtable is completely drained. */
610 	s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
611 					rd_hash_equal);
612 
613 	return ramdisk_flush(driver, s);
614 }
615 
616 
ramdisk_start(td_driver_t * driver)617 static int ramdisk_start(td_driver_t *driver)
618 {
619 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
620 
621 	if (s->ramdisk.h) {
622 		RPRINTF("ramdisk already allocated\n");
623 		return 0;
624 	}
625 
626 	s->ramdisk.sector_size = driver->info.sector_size;
627 	s->ramdisk.h = create_hashtable(RAMDISK_HASHSIZE, uint64_hash,
628 					rd_hash_equal);
629 
630 	DPRINTF("Ramdisk started, %zu bytes/sector\n", s->ramdisk.sector_size);
631 
632 	return 0;
633 }
634 
635 /* common client/server functions */
636 /* mayberead: Time out after a certain interval. */
mread(int fd,void * buf,size_t len)637 static int mread(int fd, void* buf, size_t len)
638 {
639 	fd_set rfds;
640 	int rc;
641 	size_t cur = 0;
642 	struct timeval tv = {
643 		.tv_sec = HEARTBEAT_MS / 1000,
644 		.tv_usec = (HEARTBEAT_MS % 1000) * 1000
645 	};
646 
647 	if (!len)
648 		return 0;
649 
650 	/* read first. Only select if read is incomplete. */
651 	rc = read(fd, buf, len);
652 	while (rc < 0 || cur + rc < len) {
653 		if (!rc) {
654 			RPRINTF("end-of-file");
655 			return -1;
656 		}
657 		if (rc < 0 && errno != EAGAIN) {
658 			RPRINTF("error during read: %s\n", strerror(errno));
659 			return -1;
660 		}
661 		if (rc > 0)
662 			cur += rc;
663 
664 		FD_ZERO(&rfds);
665 		FD_SET(fd, &rfds);
666 		if (!(rc = select(fd + 1, &rfds, NULL, NULL, &tv))) {
667 			RPRINTF("time out during read\n");
668 			return -1;
669 		} else if (rc < 0) {
670 			RPRINTF("error during select: %d\n", errno);
671 			return -1;
672 		}
673 		rc = read(fd, buf + cur, len - cur);
674 	}
675 	/*
676 	  RPRINTF("read %d bytes\n", cur + rc);
677 	*/
678 
679 	return 0;
680 }
681 
mwrite(int fd,void * buf,size_t len)682 static int mwrite(int fd, void* buf, size_t len)
683 {
684 	fd_set wfds;
685 	size_t cur = 0;
686 	int rc;
687 	struct timeval tv = {
688 		.tv_sec = HEARTBEAT_MS / 1000,
689 		.tv_usec = (HEARTBEAT_MS % 1000) * 1000
690 	};
691 
692 	if (!len)
693 		return 0;
694 
695 	/* read first. Only select if read is incomplete. */
696 	rc = write(fd, buf, len);
697 	while (rc < 0 || cur + rc < len) {
698 		if (!rc) {
699 			RPRINTF("end-of-file");
700 			return -1;
701 		}
702 		if (rc < 0 && errno != EAGAIN) {
703 			RPRINTF("error during write: %s\n", strerror(errno));
704 			return -1;
705 		}
706 		if (rc > 0)
707 			cur += rc;
708 
709 		FD_ZERO(&wfds);
710 		FD_SET(fd, &wfds);
711 		if (!(rc = select(fd + 1, NULL, &wfds, NULL, &tv))) {
712 			RPRINTF("time out during write\n");
713 			return -1;
714 		} else if (rc < 0) {
715 			RPRINTF("error during select: %d\n", errno);
716 			return -1;
717 		}
718 		rc = write(fd, buf + cur, len - cur);
719 	}
720 	/*
721 	  RPRINTF("wrote %d bytes\n", cur + rc);
722 	*/
723 
724 	return 0;
725 	FD_ZERO(&wfds);
726 	FD_SET(fd, &wfds);
727 	select(fd + 1, NULL, &wfds, NULL, &tv);
728 }
729 
730 
close_stream_fd(struct tdremus_state * s)731 static void inline close_stream_fd(struct tdremus_state *s)
732 {
733 	/* XXX: -2 is magic. replace with macro perhaps? */
734 	tapdisk_server_unregister_event(s->stream_fd.id);
735 	close(s->stream_fd.fd);
736 	s->stream_fd.fd = -2;
737 }
738 
739 /* primary functions */
740 static void remus_client_event(event_id_t, char mode, void *private);
741 static void remus_connect_event(event_id_t id, char mode, void *private);
742 static void remus_retry_connect_event(event_id_t id, char mode, void *private);
743 
primary_do_connect(struct tdremus_state * state)744 static int primary_do_connect(struct tdremus_state *state)
745 {
746 	event_id_t id;
747 	int fd;
748 	int rc;
749 	int flags;
750 
751 	RPRINTF("client connecting to %s:%d...\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
752 
753 	if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
754 		RPRINTF("could not create client socket: %d\n", errno);
755 		return -1;
756 	}
757 
758 	/* make socket nonblocking */
759 	if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
760 		flags = 0;
761 	if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
762 		return -1;
763 
764 	/* once we have created the socket and populated the address, we can now start
765 	 * our non-blocking connect. rather than duplicating code we trigger a timeout
766 	 * on the socket fd, which calls out nonblocking connect code
767 	 */
768 	if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, fd, 0, remus_retry_connect_event, state)) < 0) {
769 		RPRINTF("error registering timeout client connection event handler: %s\n", strerror(id));
770 		/* TODO: we leak a fd here */
771 		return -1;
772 	}
773 	state->stream_fd.fd = fd;
774 	state->stream_fd.id = id;
775 	return 0;
776 }
777 
primary_blocking_connect(struct tdremus_state * state)778 static int primary_blocking_connect(struct tdremus_state *state)
779 {
780 	int fd;
781 	int id;
782 	int rc;
783 	int flags;
784 
785 	RPRINTF("client connecting to %s:%d...\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
786 
787 	if ((fd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
788 		RPRINTF("could not create client socket: %d\n", errno);
789 		return -1;
790 	}
791 
792 	do {
793 		if ((rc = connect(fd, (struct sockaddr *)&state->sa,
794 		    sizeof(state->sa))) < 0)
795 		{
796 			if (errno == ECONNREFUSED) {
797 				RPRINTF("connection refused -- retrying in 1 second\n");
798 				sleep(1);
799 			} else {
800 				RPRINTF("connection failed: %d\n", errno);
801 				close(fd);
802 				return -1;
803 			}
804 		}
805 	} while (rc < 0);
806 
807 	RPRINTF("client connected\n");
808 
809 	/* make socket nonblocking */
810 	if ((flags = fcntl(fd, F_GETFL, 0)) == -1)
811 		flags = 0;
812 	if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
813 	{
814 		RPRINTF("error making socket nonblocking\n");
815 		close(fd);
816 		return -1;
817 	}
818 
819 	if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, fd, 0, remus_client_event, state)) < 0) {
820 		RPRINTF("error registering client event handler: %s\n", strerror(id));
821 		close(fd);
822 		return -1;
823 	}
824 
825 	state->stream_fd.fd = fd;
826 	state->stream_fd.id = id;
827 	return 0;
828 }
829 
830 /* on read, just pass request through */
primary_queue_read(td_driver_t * driver,td_request_t treq)831 static void primary_queue_read(td_driver_t *driver, td_request_t treq)
832 {
833 	/* just pass read through */
834 	td_forward_request(treq);
835 }
836 
837 /* TODO:
838  * The primary uses mwrite() to write the contents of a write request to the
839  * backup. This effectively blocks until all data has been copied into a system
840  * buffer or a timeout has occured. We may wish to instead use tapdisk's
841  * nonblocking i/o interface, tapdisk_server_register_event(), to set timeouts
842  * and write data in an asynchronous fashion.
843  */
primary_queue_write(td_driver_t * driver,td_request_t treq)844 static void primary_queue_write(td_driver_t *driver, td_request_t treq)
845 {
846 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
847 
848 	char header[sizeof(uint32_t) + sizeof(uint64_t)];
849 	uint32_t *sectors = (uint32_t *)header;
850 	uint64_t *sector = (uint64_t *)(header + sizeof(uint32_t));
851 
852 	// RPRINTF("write: stream_fd.fd: %d\n", s->stream_fd.fd);
853 
854 	/* -1 means we haven't connected yet, -2 means the connection was lost */
855 	if(s->stream_fd.fd == -1) {
856 		RPRINTF("connecting to backup...\n");
857 		primary_blocking_connect(s);
858 	}
859 
860 	*sectors = treq.secs;
861 	*sector = treq.sec;
862 
863 	if (mwrite(s->stream_fd.fd, TDREMUS_WRITE, strlen(TDREMUS_WRITE)) < 0)
864 		goto fail;
865 	if (mwrite(s->stream_fd.fd, header, sizeof(header)) < 0)
866 		goto fail;
867 
868 	if (mwrite(s->stream_fd.fd, treq.buf, treq.secs * driver->info.sector_size) < 0)
869 		goto fail;
870 
871 	td_forward_request(treq);
872 
873 	return;
874 
875  fail:
876 	/* switch to unprotected mode and tell tapdisk to retry */
877 	RPRINTF("write request replication failed, switching to unprotected mode");
878 	switch_mode(s->tdremus_driver, mode_unprotected);
879 	td_complete_request(treq, -EBUSY);
880 }
881 
882 
client_flush(td_driver_t * driver)883 static int client_flush(td_driver_t *driver)
884 {
885 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
886 
887 	// RPRINTF("committing output\n");
888 
889 	if (s->stream_fd.fd == -1)
890 		/* connection not yet established, nothing to flush */
891 		return 0;
892 
893 	if (mwrite(s->stream_fd.fd, TDREMUS_COMMIT, strlen(TDREMUS_COMMIT)) < 0) {
894 		RPRINTF("error flushing output");
895 		close_stream_fd(s);
896 		return -1;
897 	}
898 
899 	return 0;
900 }
901 
server_flush(td_driver_t * driver)902 static int server_flush(td_driver_t *driver)
903 {
904 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
905 	/*
906 	 * Nothing to flush in beginning.
907 	 */
908 	if (!s->ramdisk.prev)
909 		return 0;
910 	/* Try to flush any remaining requests */
911 	return ramdisk_flush(driver, s);
912 }
913 
primary_start(td_driver_t * driver)914 static int primary_start(td_driver_t *driver)
915 {
916 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
917 
918 	RPRINTF("activating client mode\n");
919 
920 	tapdisk_remus.td_queue_read = primary_queue_read;
921 	tapdisk_remus.td_queue_write = primary_queue_write;
922 	s->queue_flush = client_flush;
923 
924 	s->stream_fd.fd = -1;
925 	s->stream_fd.id = -1;
926 
927 	return 0;
928 }
929 
930 /* timeout callback */
remus_retry_connect_event(event_id_t id,char mode,void * private)931 static void remus_retry_connect_event(event_id_t id, char mode, void *private)
932 {
933 	struct tdremus_state *s = (struct tdremus_state *)private;
934 
935 	/* do a non-blocking connect */
936 	if (connect(s->stream_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa))
937 	    && errno != EINPROGRESS)
938 	{
939 		if(errno == ECONNREFUSED || errno == ENETUNREACH || errno == EAGAIN || errno == ECONNABORTED)
940 		{
941 			/* try again in a second */
942 			tapdisk_server_unregister_event(s->stream_fd.id);
943 			if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, s->stream_fd.fd, REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) {
944 				RPRINTF("error registering timeout client connection event handler: %s\n", strerror(id));
945 				return;
946 			}
947 			s->stream_fd.id = id;
948 		}
949 		else
950 		{
951 			/* not recoverable */
952 			RPRINTF("error connection to server %s\n", strerror(errno));
953 			return;
954 		}
955 	}
956 	else
957 	{
958 		/* the connect returned EINPROGRESS (nonblocking connect) we must wait for the fd to be writeable to determine if the connect worked */
959 
960 		tapdisk_server_unregister_event(s->stream_fd.id);
961 		if((id = tapdisk_server_register_event(SCHEDULER_POLL_WRITE_FD, s->stream_fd.fd, 0, remus_connect_event, s)) < 0) {
962 			RPRINTF("error registering client connection event handler: %s\n", strerror(id));
963 			return;
964 		}
965 		s->stream_fd.id = id;
966 	}
967 }
968 
969 /* callback when nonblocking connect() is finished */
970 /* called only by primary in unprotected state */
remus_connect_event(event_id_t id,char mode,void * private)971 static void remus_connect_event(event_id_t id, char mode, void *private)
972 {
973 	int socket_errno;
974 	socklen_t socket_errno_size;
975 	struct tdremus_state *s = (struct tdremus_state *)private;
976 
977 	/* check to se if the connect succeeded */
978 	socket_errno_size = sizeof(socket_errno);
979 	if (getsockopt(s->stream_fd.fd, SOL_SOCKET, SO_ERROR, &socket_errno, &socket_errno_size)) {
980 		RPRINTF("error getting socket errno\n");
981 		return;
982 	}
983 
984 	RPRINTF("socket connect returned %d\n", socket_errno);
985 
986 	if(socket_errno)
987 	{
988 		/* the connect did not succeed */
989 
990 		if(socket_errno == ECONNREFUSED || socket_errno == ENETUNREACH || socket_errno == ETIMEDOUT
991 		   || socket_errno == ECONNABORTED || socket_errno == EAGAIN)
992 		{
993 			/* we can probably assume that the backup is down. just try again later */
994 			tapdisk_server_unregister_event(s->stream_fd.id);
995 			if((id = tapdisk_server_register_event(SCHEDULER_POLL_TIMEOUT, s->stream_fd.fd, REMUS_CONNRETRY_TIMEOUT, remus_retry_connect_event, s)) < 0) {
996 				RPRINTF("error registering timeout client connection event handler: %s\n", strerror(id));
997 				return;
998 			}
999 			s->stream_fd.id = id;
1000 		}
1001 		else
1002 		{
1003 			RPRINTF("socket connect returned %d, giving up\n", socket_errno);
1004 		}
1005 	}
1006 	else
1007 	{
1008 		/* the connect succeeded */
1009 
1010 		/* unregister this function and register a new event handler */
1011 		tapdisk_server_unregister_event(s->stream_fd.id);
1012 		if((id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->stream_fd.fd, 0, remus_client_event, s)) < 0) {
1013 			RPRINTF("error registering client event handler: %s\n", strerror(id));
1014 			return;
1015 		}
1016 		s->stream_fd.id = id;
1017 
1018 		/* switch from unprotected to protected client */
1019 		switch_mode(s->tdremus_driver, mode_primary);
1020 	}
1021 }
1022 
1023 
1024 /* we install this event handler on the primary once we have connected to the backup */
1025 /* wait for "done" message to commit checkpoint */
remus_client_event(event_id_t id,char mode,void * private)1026 static void remus_client_event(event_id_t id, char mode, void *private)
1027 {
1028 	struct tdremus_state *s = (struct tdremus_state *)private;
1029 	char req[5];
1030 	int rc;
1031 
1032 	if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
1033 		/* replication stream closed or otherwise broken (timeout, reset, &c) */
1034 		RPRINTF("error reading from backup\n");
1035 		close_stream_fd(s);
1036 		return;
1037 	}
1038 
1039 	req[4] = '\0';
1040 
1041 	if (!strcmp(req, TDREMUS_DONE))
1042 		/* checkpoint committed, inform msg_fd */
1043 		ctl_respond(s, TDREMUS_DONE);
1044 	else {
1045 		RPRINTF("received unknown message: %s\n", req);
1046 		close_stream_fd(s);
1047 	}
1048 
1049 	return;
1050 }
1051 
1052 /* backup functions */
1053 static void remus_server_event(event_id_t id, char mode, void *private);
1054 
1055 /* returns the socket that receives write requests */
remus_server_accept(event_id_t id,char mode,void * private)1056 static void remus_server_accept(event_id_t id, char mode, void* private)
1057 {
1058 	struct tdremus_state* s = (struct tdremus_state *) private;
1059 
1060 	int stream_fd;
1061 	event_id_t cid;
1062 
1063 	/* XXX: add address-based black/white list */
1064 	if ((stream_fd = accept(s->server_fd.fd, NULL, NULL)) < 0) {
1065 		RPRINTF("error accepting connection: %d\n", errno);
1066 		return;
1067 	}
1068 
1069 	/* TODO: check to see if we are already replicating. if so just close the
1070 	 * connection (or do something smarter) */
1071 	RPRINTF("server accepted connection\n");
1072 
1073 	/* add tapdisk event for replication stream */
1074 	cid = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, stream_fd, 0,
1075 					    remus_server_event, s);
1076 
1077 	if(cid < 0) {
1078 		RPRINTF("error registering connection event handler: %s\n", strerror(errno));
1079 		close(stream_fd);
1080 		return;
1081 	}
1082 
1083 	/* store replication file descriptor */
1084 	s->stream_fd.fd = stream_fd;
1085 	s->stream_fd.id = cid;
1086 }
1087 
1088 /* returns -2 if EADDRNOTAVAIL */
remus_bind(struct tdremus_state * s)1089 static int remus_bind(struct tdremus_state* s)
1090 {
1091 //  struct sockaddr_in sa;
1092 	int opt;
1093 	int rc = -1;
1094 
1095 	if ((s->server_fd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
1096 		RPRINTF("could not create server socket: %d\n", errno);
1097 		return rc;
1098 	}
1099 	opt = 1;
1100 	if (setsockopt(s->server_fd.fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0)
1101 		RPRINTF("Error setting REUSEADDR on %d: %d\n", s->server_fd.fd, errno);
1102 
1103 	if (bind(s->server_fd.fd, (struct sockaddr *)&s->sa, sizeof(s->sa)) < 0) {
1104 		RPRINTF("could not bind server socket %d to %s:%d: %d %s\n", s->server_fd.fd,
1105 			inet_ntoa(s->sa.sin_addr), ntohs(s->sa.sin_port), errno, strerror(errno));
1106 		if (errno != EADDRINUSE)
1107 			rc = -2;
1108 		goto err_sfd;
1109 	}
1110 	if (listen(s->server_fd.fd, 10)) {
1111 		RPRINTF("could not listen on socket: %d\n", errno);
1112 		goto err_sfd;
1113 	}
1114 
1115 	/* The socket s now bound to the address and listening so we may now register
1116    * the fd with tapdisk */
1117 
1118 	if((s->server_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD,
1119 							    s->server_fd.fd, 0,
1120 							    remus_server_accept, s)) < 0) {
1121 		RPRINTF("error registering server connection event handler: %s",
1122 			strerror(s->server_fd.id));
1123 		goto err_sfd;
1124 	}
1125 
1126 	return 0;
1127 
1128  err_sfd:
1129 	close(s->server_fd.fd);
1130 	s->server_fd.fd = -1;
1131 
1132 	return rc;
1133 }
1134 
1135 /* wait for latest checkpoint to be applied */
server_writes_inflight(td_driver_t * driver)1136 static inline int server_writes_inflight(td_driver_t *driver)
1137 {
1138 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
1139 
1140 	if (!s->ramdisk.inflight && !s->ramdisk.prev)
1141 		return 0;
1142 
1143 	return 1;
1144 }
1145 
1146 /* Due to block device prefetching this code may be called on the server side
1147  * during normal replication. In this case we must return EBUSY, otherwise the
1148  * domain may be started with stale data.
1149  */
backup_queue_read(td_driver_t * driver,td_request_t treq)1150 void backup_queue_read(td_driver_t *driver, td_request_t treq)
1151 {
1152 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
1153 	int i;
1154 	if(!remus_image)
1155 		remus_image = treq.image;
1156 
1157 	/* check if this read is queued in any currently ongoing flush */
1158 	if (ramdisk_read(&s->ramdisk, treq.sec, treq.secs, treq.buf)) {
1159 		/* TODO: Add to pending read hash */
1160 		td_forward_request(treq);
1161 	} else {
1162 		/* complete the request */
1163 		td_complete_request(treq, 0);
1164 	}
1165 }
1166 
1167 /* see above */
backup_queue_write(td_driver_t * driver,td_request_t treq)1168 void backup_queue_write(td_driver_t *driver, td_request_t treq)
1169 {
1170 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
1171 
1172 	/* on a server write, we know the domain has failed over. we must change our
1173 	 * state to unprotected and then have the unprotected queue_write function
1174 	 * handle the write
1175 	 */
1176 
1177 	switch_mode(driver, mode_unprotected);
1178 	/* TODO: call the appropriate write function rather than return EBUSY */
1179 	td_complete_request(treq, -EBUSY);
1180 }
1181 
backup_start(td_driver_t * driver)1182 static int backup_start(td_driver_t *driver)
1183 {
1184 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
1185 	int fd;
1186 
1187 	if (ramdisk_start(driver) < 0)
1188 		return -1;
1189 
1190 	tapdisk_remus.td_queue_read = backup_queue_read;
1191 	tapdisk_remus.td_queue_write = backup_queue_write;
1192 	s->queue_flush = server_flush;
1193 	/* TODO set flush function */
1194 	return 0;
1195 }
1196 
server_do_wreq(td_driver_t * driver)1197 static int server_do_wreq(td_driver_t *driver)
1198 {
1199 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
1200 	static tdremus_wire_t twreq;
1201 	char buf[4096];
1202 	int len, rc;
1203 
1204 	char header[sizeof(uint32_t) + sizeof(uint64_t)];
1205 	uint32_t *sectors = (uint32_t *) header;
1206 	uint64_t *sector =  (uint64_t *) &header[sizeof(uint32_t)];
1207 
1208 	// RPRINTF("received write request\n");
1209 
1210 	if (mread(s->stream_fd.fd, header, sizeof(header)) < 0)
1211 		goto err;
1212 
1213 	len = *sectors * driver->info.sector_size;
1214 
1215 	//RPRINTF("writing %d sectors (%d bytes) starting at %" PRIu64 "\n", *sectors, len,
1216 	// *sector);
1217 
1218 	if (len > sizeof(buf)) {
1219 		/* freak out! */
1220 		RPRINTF("write request too large: %d/%u\n", len, (unsigned)sizeof(buf));
1221 		return -1;
1222 	}
1223 
1224 	if (mread(s->stream_fd.fd, buf, len) < 0)
1225 		goto err;
1226 
1227 	if (ramdisk_write(&s->ramdisk, *sector, *sectors, buf) < 0)
1228 		goto err;
1229 
1230 	return 0;
1231 
1232  err:
1233 	/* should start failover */
1234 	RPRINTF("backup write request error\n");
1235 	close_stream_fd(s);
1236 
1237 	return -1;
1238 }
1239 
server_do_sreq(td_driver_t * driver)1240 static int server_do_sreq(td_driver_t *driver)
1241 {
1242 	/*
1243 	  RPRINTF("submit request received\n");
1244   */
1245 
1246 	return 0;
1247 }
1248 
1249 /* at this point, the server can start applying the most recent
1250  * ramdisk. */
server_do_creq(td_driver_t * driver)1251 static int server_do_creq(td_driver_t *driver)
1252 {
1253 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
1254 
1255 	// RPRINTF("committing buffer\n");
1256 
1257 	ramdisk_start_flush(driver);
1258 
1259 	/* XXX this message should not be sent until flush completes! */
1260 	if (write(s->stream_fd.fd, TDREMUS_DONE, strlen(TDREMUS_DONE)) != 4)
1261 		return -1;
1262 
1263 	return 0;
1264 }
1265 
1266 
1267 /* called when data is pending in s->rfd */
remus_server_event(event_id_t id,char mode,void * private)1268 static void remus_server_event(event_id_t id, char mode, void *private)
1269 {
1270 	struct tdremus_state *s = (struct tdremus_state *)private;
1271 	td_driver_t *driver = s->tdremus_driver;
1272 	char req[5];
1273 
1274 	// RPRINTF("replication data waiting\n");
1275 
1276 	/* TODO: add a get_connection_by_event_id() function.
1277 	 * for now we can assume that the fd is s->stream_fd */
1278 
1279 	if (mread(s->stream_fd.fd, req, sizeof(req) - 1) < 0) {
1280 		RPRINTF("error reading server event, activating backup\n");
1281 		switch_mode(driver, mode_unprotected);
1282 		return;
1283 	}
1284 
1285 	req[4] = '\0';
1286 
1287 	if (!strcmp(req, TDREMUS_WRITE))
1288 		server_do_wreq(driver);
1289 	else if (!strcmp(req, TDREMUS_SUBMIT))
1290 		server_do_sreq(driver);
1291 	else if (!strcmp(req, TDREMUS_COMMIT))
1292 		server_do_creq(driver);
1293 	else
1294 		RPRINTF("unknown request received: %s\n", req);
1295 
1296 	return;
1297 
1298 }
1299 
1300 /* unprotected */
1301 
unprotected_queue_read(td_driver_t * driver,td_request_t treq)1302 void unprotected_queue_read(td_driver_t *driver, td_request_t treq)
1303 {
1304 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
1305 
1306 	/* wait for previous ramdisk to flush  before servicing reads */
1307 	if (server_writes_inflight(driver)) {
1308 		/* for now lets just return EBUSY.
1309 		 * if there are any left-over requests in prev,
1310 		 * kick em again.
1311 		 */
1312 		if(!s->ramdisk.inflight) /* nothing in inprogress */
1313 			ramdisk_flush(driver, s);
1314 
1315 		td_complete_request(treq, -EBUSY);
1316 	}
1317 	else {
1318 		/* here we just pass reads through */
1319 		td_forward_request(treq);
1320 	}
1321 }
1322 
1323 /* For a recoverable remus solution we need to log unprotected writes here */
unprotected_queue_write(td_driver_t * driver,td_request_t treq)1324 void unprotected_queue_write(td_driver_t *driver, td_request_t treq)
1325 {
1326 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
1327 
1328 	/* wait for previous ramdisk to flush */
1329 	if (server_writes_inflight(driver)) {
1330 		RPRINTF("queue_write: waiting for queue to drain");
1331 		if(!s->ramdisk.inflight) /* nothing in inprogress. Kick prev */
1332 			ramdisk_flush(driver, s);
1333 		td_complete_request(treq, -EBUSY);
1334 	}
1335 	else {
1336 		// RPRINTF("servicing write request on backup\n");
1337 		/* NOTE: DRBD style bitmap tracking could go here */
1338 		td_forward_request(treq);
1339 	}
1340 }
1341 
unprotected_start(td_driver_t * driver)1342 static int unprotected_start(td_driver_t *driver)
1343 {
1344 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
1345 
1346 	RPRINTF("failure detected, activating passthrough\n");
1347 
1348 	/* close the server socket */
1349 	close_stream_fd(s);
1350 
1351 	/* unregister the replication stream */
1352 	tapdisk_server_unregister_event(s->server_fd.id);
1353 
1354 	/* close the replication stream */
1355 	close(s->server_fd.fd);
1356 	s->server_fd.fd = -1;
1357 
1358 	/* install the unprotected read/write handlers */
1359 	tapdisk_remus.td_queue_read = unprotected_queue_read;
1360 	tapdisk_remus.td_queue_write = unprotected_queue_write;
1361 
1362 	return 0;
1363 }
1364 
1365 
1366 /* control */
1367 
resolve_address(const char * addr,struct in_addr * ia)1368 static inline int resolve_address(const char* addr, struct in_addr* ia)
1369 {
1370 	struct hostent* he;
1371 	uint32_t ip;
1372 
1373 	if (!(he = gethostbyname(addr))) {
1374 		RPRINTF("error resolving %s: %d\n", addr, h_errno);
1375 		return -1;
1376 	}
1377 
1378 	if (!he->h_addr_list[0]) {
1379 		RPRINTF("no address found for %s\n", addr);
1380 		return -1;
1381 	}
1382 
1383 	/* network byte order */
1384 	ip = *((uint32_t**)he->h_addr_list)[0];
1385 	ia->s_addr = ip;
1386 
1387 	return 0;
1388 }
1389 
get_args(td_driver_t * driver,const char * name)1390 static int get_args(td_driver_t *driver, const char* name)
1391 {
1392 	struct tdremus_state *state = (struct tdremus_state *)driver->data;
1393 	char* host;
1394 	char* port;
1395 //  char* driver_str;
1396 //  char* parent;
1397 //  int type;
1398 //  char* path;
1399 //  unsigned long ulport;
1400 //  int i;
1401 //  struct sockaddr_in server_addr_in;
1402 
1403 	int gai_status;
1404 	int valid_addr;
1405 	struct addrinfo gai_hints;
1406 	struct addrinfo *servinfo, *servinfo_itr;
1407 
1408 	memset(&gai_hints, 0, sizeof gai_hints);
1409 	gai_hints.ai_family = AF_UNSPEC;
1410 	gai_hints.ai_socktype = SOCK_STREAM;
1411 
1412 	port = strchr(name, ':');
1413 	if (!port) {
1414 		RPRINTF("missing host in %s\n", name);
1415 		return -ENOENT;
1416 	}
1417 	if (!(host = strndup(name, port - name))) {
1418 		RPRINTF("unable to allocate host\n");
1419 		return -ENOMEM;
1420 	}
1421 	port++;
1422 
1423 	if ((gai_status = getaddrinfo(host, port, &gai_hints, &servinfo)) != 0) {
1424 		RPRINTF("getaddrinfo error: %s\n", gai_strerror(gai_status));
1425 		return -ENOENT;
1426 	}
1427 
1428 	/* TODO: do something smarter here */
1429 	valid_addr = 0;
1430 	for(servinfo_itr = servinfo; servinfo_itr != NULL; servinfo_itr = servinfo_itr->ai_next) {
1431 		void *addr;
1432 		char *ipver;
1433 
1434 		if (servinfo_itr->ai_family == AF_INET) {
1435 			valid_addr = 1;
1436 			memset(&state->sa, 0, sizeof(state->sa));
1437 			state->sa = *(struct sockaddr_in *)servinfo_itr->ai_addr;
1438 			break;
1439 		}
1440 	}
1441 	freeaddrinfo(servinfo);
1442 
1443 	if (!valid_addr)
1444 		return -ENOENT;
1445 
1446 	RPRINTF("host: %s, port: %d\n", inet_ntoa(state->sa.sin_addr), ntohs(state->sa.sin_port));
1447 
1448 	return 0;
1449 }
1450 
switch_mode(td_driver_t * driver,enum tdremus_mode mode)1451 static int switch_mode(td_driver_t *driver, enum tdremus_mode mode)
1452 {
1453 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
1454 	int rc;
1455 
1456 	if (mode == s->mode)
1457 		return 0;
1458 
1459 	if (s->queue_flush)
1460 		if ((rc = s->queue_flush(driver)) < 0) {
1461 			// fall back to unprotected mode on error
1462 			RPRINTF("switch_mode: error flushing queue (old: %d, new: %d)", s->mode, mode);
1463 			mode = mode_unprotected;
1464 		}
1465 
1466 	if (mode == mode_unprotected)
1467 		rc = unprotected_start(driver);
1468 	else if (mode == mode_primary)
1469 		rc = primary_start(driver);
1470 	else if (mode == mode_backup)
1471 		rc = backup_start(driver);
1472 	else {
1473 		RPRINTF("unknown mode requested: %d\n", mode);
1474 		rc = -1;
1475 	}
1476 
1477 	if (!rc)
1478 		s->mode = mode;
1479 
1480 	return rc;
1481 }
1482 
ctl_request(event_id_t id,char mode,void * private)1483 static void ctl_request(event_id_t id, char mode, void *private)
1484 {
1485 	struct tdremus_state *s = (struct tdremus_state *)private;
1486 	td_driver_t *driver = s->tdremus_driver;
1487 	char msg[80];
1488 	int rc;
1489 
1490 	// RPRINTF("data waiting on control fifo\n");
1491 
1492 	if (!(rc = read(s->ctl_fd.fd, msg, sizeof(msg) - 1 /* append nul */))) {
1493 		RPRINTF("0-byte read received, reopening FIFO\n");
1494 		/*TODO: we may have to unregister/re-register with tapdisk_server */
1495 		close(s->ctl_fd.fd);
1496 		RPRINTF("FIFO closed\n");
1497 		if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
1498 			RPRINTF("error reopening FIFO: %d\n", errno);
1499 		}
1500 		return;
1501 	}
1502 
1503 	if (rc < 0) {
1504 		RPRINTF("error reading from FIFO: %d\n", errno);
1505 		return;
1506 	}
1507 
1508 	/* TODO: need to get driver somehow */
1509 	msg[rc] = '\0';
1510 	if (!strncmp(msg, "flush", 5)) {
1511 		if (s->queue_flush)
1512 			if ((rc = s->queue_flush(driver))) {
1513 				RPRINTF("error passing flush request to backup");
1514 				ctl_respond(s, TDREMUS_FAIL);
1515 			}
1516 	} else {
1517 		RPRINTF("unknown command: %s\n", msg);
1518 	}
1519 }
1520 
ctl_respond(struct tdremus_state * s,const char * response)1521 static int ctl_respond(struct tdremus_state *s, const char *response)
1522 {
1523 	int rc;
1524 
1525 	if ((rc = write(s->msg_fd.fd, response, strlen(response))) < 0) {
1526 		RPRINTF("error writing notification: %d\n", errno);
1527 		close(s->msg_fd.fd);
1528 		if ((s->msg_fd.fd = open(s->msg_path, O_RDWR)) < 0)
1529 			RPRINTF("error reopening FIFO: %d\n", errno);
1530 	}
1531 
1532 	return rc;
1533 }
1534 
1535 /* must be called after the underlying driver has been initialized */
ctl_open(td_driver_t * driver,const char * name)1536 static int ctl_open(td_driver_t *driver, const char* name)
1537 {
1538 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
1539 	int i, l;
1540 
1541 	/* first we must ensure that BLKTAP_CTRL_DIR exists */
1542 	if (mkdir(BLKTAP_CTRL_DIR, 0755) && errno != EEXIST)
1543 	{
1544 		DPRINTF("error creating directory %s: %d\n", BLKTAP_CTRL_DIR, errno);
1545 		return -1;
1546 	}
1547 
1548 	/* use the device name to create the control fifo path */
1549 	if (asprintf(&s->ctl_path, BLKTAP_CTRL_DIR "/remus_%s", name) < 0)
1550 		return -1;
1551 	/* scrub fifo pathname  */
1552 	for (i = strlen(BLKTAP_CTRL_DIR) + 1, l = strlen(s->ctl_path); i < l; i++) {
1553 		if (strchr(":/", s->ctl_path[i]))
1554 			s->ctl_path[i] = '_';
1555 	}
1556 	if (asprintf(&s->msg_path, "%s.msg", s->ctl_path) < 0)
1557 		goto err_ctlfifo;
1558 
1559 	if (mkfifo(s->ctl_path, S_IRWXU|S_IRWXG|S_IRWXO) && errno != EEXIST) {
1560 		RPRINTF("error creating control FIFO %s: %d\n", s->ctl_path, errno);
1561 		goto err_msgfifo;
1562 	}
1563 
1564 	if (mkfifo(s->msg_path, S_IRWXU|S_IRWXG|S_IRWXO) && errno != EEXIST) {
1565 		RPRINTF("error creating message FIFO %s: %d\n", s->msg_path, errno);
1566 		goto err_msgfifo;
1567 	}
1568 
1569 	/* RDWR so that fd doesn't block select when no writer is present */
1570 	if ((s->ctl_fd.fd = open(s->ctl_path, O_RDWR)) < 0) {
1571 		RPRINTF("error opening control FIFO %s: %d\n", s->ctl_path, errno);
1572 		goto err_msgfifo;
1573 	}
1574 
1575 	if ((s->msg_fd.fd = open(s->msg_path, O_RDWR)) < 0) {
1576 		RPRINTF("error opening message FIFO %s: %d\n", s->msg_path, errno);
1577 		goto err_openctlfifo;
1578 	}
1579 
1580 	RPRINTF("control FIFO %s\n", s->ctl_path);
1581 	RPRINTF("message FIFO %s\n", s->msg_path);
1582 
1583 	return 0;
1584 
1585  err_openctlfifo:
1586 	close(s->ctl_fd.fd);
1587  err_msgfifo:
1588 	free(s->msg_path);
1589 	s->msg_path = NULL;
1590  err_ctlfifo:
1591 	free(s->ctl_path);
1592 	s->ctl_path = NULL;
1593 	return -1;
1594 }
1595 
ctl_close(td_driver_t * driver)1596 static void ctl_close(td_driver_t *driver)
1597 {
1598 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
1599 
1600 	/* TODO: close *all* connections */
1601 
1602 	if(s->ctl_fd.fd)
1603 		close(s->ctl_fd.fd);
1604 
1605 	if (s->ctl_path) {
1606 		unlink(s->ctl_path);
1607 		free(s->ctl_path);
1608 		s->ctl_path = NULL;
1609 	}
1610 	if (s->msg_path) {
1611 		unlink(s->msg_path);
1612 		free(s->msg_path);
1613 		s->msg_path = NULL;
1614 	}
1615 }
1616 
ctl_register(struct tdremus_state * s)1617 static int ctl_register(struct tdremus_state *s)
1618 {
1619 	RPRINTF("registering ctl fifo\n");
1620 
1621 	/* register ctl fd */
1622 	s->ctl_fd.id = tapdisk_server_register_event(SCHEDULER_POLL_READ_FD, s->ctl_fd.fd, 0, ctl_request, s);
1623 
1624 	if (s->ctl_fd.id < 0) {
1625 		RPRINTF("error registering ctrl FIFO %s: %d\n", s->ctl_path, s->ctl_fd.id);
1626 		return -1;
1627 	}
1628 
1629 	return 0;
1630 }
1631 
1632 /* interface */
1633 
tdremus_open(td_driver_t * driver,const char * name,td_flag_t flags)1634 static int tdremus_open(td_driver_t *driver, const char *name,
1635 			td_flag_t flags)
1636 {
1637 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
1638 	int rc;
1639 
1640 	RPRINTF("opening %s\n", name);
1641 
1642 	/* first we need to get the underlying vbd for this driver stack. To do so we
1643 	 * need to know the vbd's id. Fortunately, for tapdisk2 this is hard-coded as
1644 	 * 0 (see tapdisk2.c)
1645 	 */
1646 	device_vbd = tapdisk_server_get_vbd(0);
1647 
1648 	memset(s, 0, sizeof(*s));
1649 	s->server_fd.fd = -1;
1650 	s->stream_fd.fd = -1;
1651 	s->ctl_fd.fd = -1;
1652 	s->msg_fd.fd = -1;
1653 
1654 	/* TODO: this is only needed so that the server can send writes down
1655 	 * the driver stack from the stream_fd event handler */
1656 	s->tdremus_driver = driver;
1657 
1658 	/* parse name to get info etc */
1659 	if ((rc = get_args(driver, name)))
1660 		return rc;
1661 
1662 	if ((rc = ctl_open(driver, name))) {
1663 		RPRINTF("error setting up control channel\n");
1664 		free(s->driver_data);
1665 		return rc;
1666 	}
1667 
1668 	if ((rc = ctl_register(s))) {
1669 		RPRINTF("error registering control channel\n");
1670 		free(s->driver_data);
1671 		return rc;
1672 	}
1673 
1674 	if (!(rc = remus_bind(s)))
1675 		rc = switch_mode(driver, mode_backup);
1676 	else if (rc == -2)
1677 		rc = switch_mode(driver, mode_primary);
1678 
1679 	if (!rc)
1680 		return 0;
1681 
1682 	tdremus_close(driver);
1683 	return -EIO;
1684 }
1685 
tdremus_close(td_driver_t * driver)1686 static int tdremus_close(td_driver_t *driver)
1687 {
1688 	struct tdremus_state *s = (struct tdremus_state *)driver->data;
1689 
1690 	RPRINTF("closing\n");
1691 	if (s->ramdisk.inprogress)
1692 		hashtable_destroy(s->ramdisk.inprogress, 0);
1693 
1694 	if (s->driver_data) {
1695 		free(s->driver_data);
1696 		s->driver_data = NULL;
1697 	}
1698 	if (s->server_fd.fd >= 0) {
1699 		close(s->server_fd.fd);
1700 		s->server_fd.fd = -1;
1701 	}
1702 	if (s->stream_fd.fd >= 0)
1703 		close_stream_fd(s);
1704 
1705 	ctl_close(driver);
1706 
1707 	return 0;
1708 }
1709 
tdremus_get_parent_id(td_driver_t * driver,td_disk_id_t * id)1710 static int tdremus_get_parent_id(td_driver_t *driver, td_disk_id_t *id)
1711 {
1712 	/* we shouldn't have a parent... for now */
1713 	return -EINVAL;
1714 }
1715 
tdremus_validate_parent(td_driver_t * driver,td_driver_t * pdriver,td_flag_t flags)1716 static int tdremus_validate_parent(td_driver_t *driver,
1717 				   td_driver_t *pdriver, td_flag_t flags)
1718 {
1719 	return 0;
1720 }
1721 
1722 struct tap_disk tapdisk_remus = {
1723 	.disk_type          = "tapdisk_remus",
1724 	.private_data_size  = sizeof(struct tdremus_state),
1725 	.td_open            = tdremus_open,
1726 	.td_queue_read      = unprotected_queue_read,
1727 	.td_queue_write     = unprotected_queue_write,
1728 	.td_close           = tdremus_close,
1729 	.td_get_parent_id   = tdremus_get_parent_id,
1730 	.td_validate_parent = tdremus_validate_parent,
1731 	.td_debug           = NULL,
1732 };
1733