1(*
2 * Copyright (C) 2006-2007 XenSource Ltd.
3 * Copyright (C) 2008      Citrix Ltd.
4 * Author Vincent Hanquez <vincent.hanquez@eu.citrix.com>
5 * Author Thomas Gazagnaire <thomas.gazagnaire@eu.citrix.com>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU Lesser General Public License as published
9 * by the Free Software Foundation; version 2.1 only. with the special
10 * exception on linking described in file LICENSE.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU Lesser General Public License for more details.
16 *)
17
18open Printf
19open Parse_arg
20open Stdext
21
22let error fmt = Logging.error "xenstored" fmt
23let debug fmt = Logging.debug "xenstored" fmt
24let info fmt = Logging.info "xenstored" fmt
25
26(*------------ event klass processors --------------*)
27let process_connection_fds store cons domains rset wset =
28	let try_fct fct c =
29		try
30			fct store cons domains c
31		with
32		| Unix.Unix_error(err, "write", _) ->
33			Connections.del_anonymous cons c;
34			error "closing socket connection: write error: %s"
35			      (Unix.error_message err)
36		| Unix.Unix_error(err, "read", _) ->
37			Connections.del_anonymous cons c;
38			if err <> Unix.ECONNRESET then
39			error "closing socket connection: read error: %s"
40			      (Unix.error_message err)
41		| Xenbus.Xb.End_of_file ->
42			Connections.del_anonymous cons c;
43			debug "closing socket connection"
44		in
45	let process_fdset_with fds fct =
46		List.iter
47			(fun fd ->
48			 try try_fct fct (Connections.find cons fd)
49			 with Not_found -> ()
50			) fds in
51	process_fdset_with rset Process.do_input;
52	process_fdset_with wset Process.do_output
53
54let process_domains store cons domains =
55	let do_io_domain domain =
56		if Domain.is_bad_domain domain
57		|| Domain.get_io_credit domain <= 0
58		|| Domain.is_paused_for_conflict domain
59		then () (* nothing to do *)
60		else (
61			let con = Connections.find_domain cons (Domain.get_id domain) in
62			Process.do_input store cons domains con;
63			Process.do_output store cons domains con;
64			Domain.decr_io_credit domain
65		) in
66	Domains.iter domains do_io_domain
67
68let sigusr1_handler store =
69	try
70		let channel = open_out_gen [ Open_wronly; Open_creat; Open_trunc; ]
71		                           0o600 (Paths.xen_run_stored ^ "/db.debug") in
72		finally (fun () -> Store.dump store channel)
73			(fun () -> close_out channel)
74	with _ ->
75		()
76
77let sighup_handler _ =
78	maybe (fun logger -> logger.Logging.restart()) !Logging.xenstored_logger;
79	maybe (fun logger -> logger.Logging.restart()) !Logging.access_logger
80
81let config_filename cf =
82	match cf.config_file with
83	| Some name -> name
84	| None      -> Define.default_config_dir ^ "/oxenstored.conf"
85
86let default_pidfile = Paths.xen_run_dir ^ "/xenstored.pid"
87
88let ring_scan_interval = ref 20
89
90let parse_config filename =
91	let pidfile = ref default_pidfile in
92	let options = [
93		("merge-activate", Config.Set_bool Transaction.do_coalesce);
94		("conflict-burst-limit", Config.Set_float Define.conflict_burst_limit);
95		("conflict-max-history-seconds", Config.Set_float Define.conflict_max_history_seconds);
96		("conflict-rate-limit-is-aggregate", Config.Set_bool Define.conflict_rate_limit_is_aggregate);
97		("perms-activate", Config.Set_bool Perms.activate);
98		("quota-activate", Config.Set_bool Quota.activate);
99		("quota-maxwatch", Config.Set_int Define.maxwatch);
100		("quota-transaction", Config.Set_int Define.maxtransaction);
101		("quota-maxentity", Config.Set_int Quota.maxent);
102		("quota-maxsize", Config.Set_int Quota.maxsize);
103		("quota-maxrequests", Config.Set_int Define.maxrequests);
104		("test-eagain", Config.Set_bool Transaction.test_eagain);
105		("persistent", Config.Set_bool Disk.enable);
106		("xenstored-log-file", Config.String Logging.set_xenstored_log_destination);
107		("xenstored-log-level", Config.String
108			(fun s -> Logging.xenstored_log_level := Logging.level_of_string s));
109		("xenstored-log-nb-files", Config.Set_int Logging.xenstored_log_nb_files);
110		("xenstored-log-nb-lines", Config.Set_int Logging.xenstored_log_nb_lines);
111		("xenstored-log-nb-chars", Config.Set_int Logging.xenstored_log_nb_chars);
112		("access-log-file", Config.String Logging.set_access_log_destination);
113		("access-log-nb-files", Config.Set_int Logging.access_log_nb_files);
114		("access-log-nb-lines", Config.Set_int Logging.access_log_nb_lines);
115		("access-log-nb-chars", Config.Set_int Logging.access_log_nb_chars);
116		("access-log-read-ops", Config.Set_bool Logging.access_log_read_ops);
117		("access-log-transactions-ops", Config.Set_bool Logging.access_log_transaction_ops);
118		("access-log-special-ops", Config.Set_bool Logging.access_log_special_ops);
119		("allow-debug", Config.Set_bool Process.allow_debug);
120		("ring-scan-interval", Config.Set_int ring_scan_interval);
121		("pid-file", Config.Set_string pidfile);
122		("xenstored-kva", Config.Set_string Domains.xenstored_kva);
123		("xenstored-port", Config.Set_string Domains.xenstored_port); ] in
124	begin try Config.read filename options (fun _ _ -> raise Not_found)
125	with
126	| Config.Error err -> List.iter (fun (k, e) ->
127		match e with
128		| "unknown key" -> eprintf "config: unknown key %s\n" k
129		| _             -> eprintf "config: %s: %s\n" k e
130		) err;
131	| Sys_error m -> eprintf "error: config: %s\n" m;
132	end;
133	!pidfile
134
135module DB = struct
136
137exception Bad_format of string
138
139let dump_format_header = "$xenstored-dump-format"
140
141let from_channel_f chan domain_f watch_f store_f =
142	let unhexify s = Utils.unhexify s in
143	let getpath s = Store.Path.of_string (Utils.unhexify s) in
144	let header = input_line chan in
145	if header <> dump_format_header then
146		raise (Bad_format "header");
147	let quit = ref false in
148	while not !quit
149	do
150		try
151			let line = input_line chan in
152			let l = String.split ',' line in
153			try
154				match l with
155				| "dom" :: domid :: mfn :: port :: []->
156					domain_f (int_of_string domid)
157					         (Nativeint.of_string mfn)
158					         (int_of_string port)
159				| "watch" :: domid :: path :: token :: [] ->
160					watch_f (int_of_string domid)
161					        (unhexify path) (unhexify token)
162				| "store" :: path :: perms :: value :: [] ->
163					store_f (getpath path)
164					        (Perms.Node.of_string (unhexify perms ^ "\000"))
165					        (unhexify value)
166				| _ ->
167					info "restoring: ignoring unknown line: %s" line
168			with exn ->
169				info "restoring: ignoring unknown line: %s (exception: %s)"
170				     line (Printexc.to_string exn);
171				()
172		with End_of_file ->
173			quit := true
174	done;
175	()
176
177let from_channel store cons doms chan =
178	(* don't let the permission get on our way, full perm ! *)
179	let op = Store.get_ops store Perms.Connection.full_rights in
180	let xc = Xenctrl.interface_open () in
181
182	let domain_f domid mfn port =
183		let ndom =
184			if domid > 0 then
185				Domains.create xc doms domid mfn port
186			else
187				Domains.create0 doms
188			in
189		Connections.add_domain cons ndom;
190		in
191	let watch_f domid path token =
192		let con = Connections.find_domain cons domid in
193		ignore (Connections.add_watch cons con path token)
194		in
195	let store_f path perms value =
196		op.Store.write path value;
197		op.Store.setperms path perms
198		in
199	finally (fun () -> from_channel_f chan domain_f watch_f store_f)
200	        (fun () -> Xenctrl.interface_close xc)
201
202let from_file store cons doms file =
203	let channel = open_in file in
204	finally (fun () -> from_channel store doms cons channel)
205	        (fun () -> close_in channel)
206
207let to_channel store cons chan =
208	let hexify s = Utils.hexify s in
209
210	fprintf chan "%s\n" dump_format_header;
211
212	(* dump connections related to domains; domid, mfn, eventchn port, watches *)
213	Connections.iter_domains cons (fun con -> Connection.dump con chan);
214
215	(* dump the store *)
216	Store.dump_fct store (fun path node ->
217		let name, perms, value = Store.Node.unpack node in
218		let fullpath = Store.Path.to_string (Store.Path.of_path_and_name path name) in
219		let permstr = Perms.Node.to_string perms in
220		fprintf chan "store,%s,%s,%s\n" (hexify fullpath) (hexify permstr) (hexify value)
221	);
222	flush chan;
223	()
224
225
226let to_file store cons file =
227	let channel = open_out_gen [ Open_wronly; Open_creat; Open_trunc; ] 0o600 file in
228	finally (fun () -> to_channel store cons channel)
229	        (fun () -> close_out channel)
230end
231
232let _ =
233	let cf = do_argv in
234	let pidfile =
235		if Sys.file_exists (config_filename cf) then
236			parse_config (config_filename cf)
237		else
238			default_pidfile
239		in
240
241	(try
242		Unixext.mkdir_rec (Filename.dirname pidfile) 0o755
243	with _ ->
244		()
245	);
246
247	let rw_sock, ro_sock =
248		if cf.disable_socket then
249			None, None
250		else
251			Some (Unix.handle_unix_error Utils.create_unix_socket Define.xs_daemon_socket),
252			Some (Unix.handle_unix_error Utils.create_unix_socket Define.xs_daemon_socket_ro)
253		in
254
255	if cf.daemonize then
256		Unixext.daemonize ()
257	else
258		printf "Xen Storage Daemon, version %d.%d\n%!"
259			Define.xenstored_major Define.xenstored_minor;
260
261	(try Unixext.pidfile_write pidfile with _ -> ());
262
263	(* for compatilibity with old xenstored *)
264	begin match cf.pidfile with
265	| Some pidfile -> Unixext.pidfile_write pidfile
266	| None         -> () end;
267
268	let store = Store.create () in
269	let eventchn = Event.init () in
270	let next_frequent_ops = ref 0. in
271	let advance_next_frequent_ops () =
272		next_frequent_ops := (Unix.gettimeofday () +. !Define.conflict_max_history_seconds)
273	in
274	let delay_next_frequent_ops_by duration =
275		next_frequent_ops := !next_frequent_ops +. duration
276	in
277	let domains = Domains.init eventchn advance_next_frequent_ops in
278
279	(* For things that need to be done periodically but more often
280	 * than the periodic_ops function *)
281	let frequent_ops () =
282		if Unix.gettimeofday () > !next_frequent_ops then (
283			History.trim ();
284			Domains.incr_conflict_credit domains;
285			advance_next_frequent_ops ()
286		) in
287	let cons = Connections.create () in
288
289	let quit = ref false in
290
291	Logging.init_xenstored_log();
292
293	let filename = Paths.xen_run_stored ^ "/db" in
294	if cf.restart && Sys.file_exists filename then (
295		DB.from_file store domains cons filename;
296		Event.bind_dom_exc_virq eventchn
297	) else (
298		if !Disk.enable then (
299			info "reading store from disk";
300			Disk.read store
301		);
302
303		let localpath = Store.Path.of_string "/local" in
304		if not (Store.path_exists store localpath) then
305			Store.mkdir store (Perms.Connection.create 0) localpath;
306
307		if cf.domain_init then (
308			Connections.add_domain cons (Domains.create0 domains);
309			Event.bind_dom_exc_virq eventchn
310		);
311	);
312
313	Select.use_poll (not cf.use_select);
314
315	Sys.set_signal Sys.sighup (Sys.Signal_handle sighup_handler);
316	Sys.set_signal Sys.sigterm (Sys.Signal_handle (fun i -> quit := true));
317	Sys.set_signal Sys.sigusr1 (Sys.Signal_handle (fun i -> sigusr1_handler store));
318	Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
319
320	if cf.activate_access_log then begin
321		let post_rotate () = DB.to_file store cons (Paths.xen_run_stored ^ "/db") in
322		Logging.init_access_log post_rotate
323	end;
324
325	let spec_fds =
326		(match rw_sock with None -> [] | Some x -> [ x ]) @
327		(match ro_sock with None -> [] | Some x -> [ x ]) @
328		(if cf.domain_init then [ Event.fd eventchn ] else [])
329		in
330
331	let xc = Xenctrl.interface_open () in
332
333	let process_special_fds rset =
334		let accept_connection can_write fd =
335			let (cfd, addr) = Unix.accept fd in
336			debug "new connection through socket";
337			Connections.add_anonymous cons cfd can_write
338		and handle_eventchn fd =
339			let port = Event.pending eventchn in
340			debug "pending port %d" (Xeneventchn.to_int port);
341			finally (fun () ->
342				if Some port = eventchn.Event.virq_port then (
343					let (notify, deaddom) = Domains.cleanup xc domains in
344					List.iter (Connections.del_domain cons) deaddom;
345					if deaddom <> [] || notify then
346						Connections.fire_spec_watches cons "@releaseDomain"
347				)
348				else
349					let c = Connections.find_domain_by_port cons port in
350					match Connection.get_domain c with
351					| Some dom -> Domain.incr_io_credit dom | None -> ()
352				) (fun () -> Event.unmask eventchn port)
353		and do_if_set fd set fct =
354			if List.mem fd set then
355				fct fd in
356
357		maybe (fun fd -> do_if_set fd rset (accept_connection true)) rw_sock;
358		maybe (fun fd -> do_if_set fd rset (accept_connection false)) ro_sock;
359		do_if_set (Event.fd eventchn) rset (handle_eventchn)
360	in
361
362	let ring_scan_checker dom =
363		(* no need to scan domains already marked as for processing *)
364		if not (Domain.get_io_credit dom > 0) then
365			let con = Connections.find_domain cons (Domain.get_id dom) in
366			if not (Connection.has_more_work con) then (
367				Process.do_output store cons domains con;
368				Process.do_input store cons domains con;
369				if Connection.has_more_work con then
370					(* Previously thought as no work, but detect some after scan (as
371					   processing a new message involves multiple steps.) It's very
372					   likely to be a "lazy" client, bump its credit. It could be false
373					   positive though (due to time window), but it's no harm to give a
374					   domain extra credit. *)
375					let n = 32 + 2 * (Domains.number domains) in
376					info "found lazy domain %d, credit %d" (Domain.get_id dom) n;
377					Domain.set_io_credit ~n dom
378			) in
379
380	let last_stat_time = ref 0. in
381	let last_scan_time = ref 0. in
382
383	let periodic_ops now =
384		debug "periodic_ops starting";
385		(* we garbage collect the string->int dictionary after a sizeable amount of operations,
386		 * there's no need to be really fast even if we got loose
387		 * objects since names are often reuse.
388		 *)
389		if Symbol.created () > 1000 || Symbol.used () > 20000
390		then begin
391			Symbol.mark_all_as_unused ();
392			Store.mark_symbols store;
393			Connections.iter cons Connection.mark_symbols;
394			History.mark_symbols ();
395			Symbol.garbage ()
396		end;
397
398		(* scan all the xs rings as a safenet for ill-behaved clients *)
399		if !ring_scan_interval >= 0 && now > (!last_scan_time +. float !ring_scan_interval) then
400			(last_scan_time := now; Domains.iter domains ring_scan_checker);
401
402		(* make sure we don't print general stats faster than 2 min *)
403		if now > (!last_stat_time +. 120.) then (
404			info "Transaction conflict statistics for last %F seconds:" (now -. !last_stat_time);
405			last_stat_time := now;
406			Domains.iter domains (Domain.log_and_reset_conflict_stats (info "Dom%d caused %Ld conflicts"));
407			info "%Ld failed transactions; of these no culprit was found for %Ld" !Transaction.failed_commits !Transaction.failed_commits_no_culprit;
408			Transaction.reset_conflict_stats ();
409
410			let gc = Gc.stat () in
411			let (lanon, lanon_ops, lanon_watchs,
412			     ldom, ldom_ops, ldom_watchs) = Connections.stats cons in
413			let store_nodes, store_abort, store_coalesce = Store.stats store in
414			let symtbl_len = Symbol.stats () in
415
416			info "store stat: nodes(%d) t-abort(%d) t-coalesce(%d)"
417			     store_nodes store_abort store_coalesce;
418			info "sytbl stat: %d" symtbl_len;
419			info "  con stat: anonymous(%d, %d o, %d w) domains(%d, %d o, %d w)"
420			     lanon lanon_ops lanon_watchs ldom ldom_ops ldom_watchs;
421			info "  mem stat: minor(%.0f) promoted(%.0f) major(%.0f) heap(%d w, %d c) live(%d w, %d b) free(%d w, %d b)"
422			     gc.Gc.minor_words gc.Gc.promoted_words gc.Gc.major_words
423			     gc.Gc.heap_words gc.Gc.heap_chunks
424			     gc.Gc.live_words gc.Gc.live_blocks
425			     gc.Gc.free_words gc.Gc.free_blocks
426		);
427		let elapsed = Unix.gettimeofday () -. now in
428		debug "periodic_ops took %F seconds." elapsed;
429		delay_next_frequent_ops_by elapsed
430	in
431
432	let period_ops_interval = 15. in
433	let period_start = ref 0. in
434
435	let main_loop () =
436		let is_peaceful c =
437			match Connection.get_domain c with
438			| None -> true (* Treat socket-connections as exempt, and free to conflict. *)
439			| Some dom -> not (Domain.is_paused_for_conflict dom)
440		in
441		frequent_ops ();
442		let mw = Connections.has_more_work cons in
443		let peaceful_mw = List.filter is_peaceful mw in
444		List.iter
445			(fun c ->
446			 match Connection.get_domain c with
447			 | None -> () | Some d -> Domain.incr_io_credit d)
448			peaceful_mw;
449		let start_time = Unix.gettimeofday () in
450		let timeout =
451			let until_next_activity =
452				if Domains.all_at_max_credit domains
453				then period_ops_interval
454				else min (max 0. (!next_frequent_ops -. start_time)) period_ops_interval in
455			if peaceful_mw <> [] then 0. else until_next_activity
456		in
457		let inset, outset = Connections.select ~only_if:is_peaceful cons in
458		let rset, wset, _ =
459		try
460			Select.select (spec_fds @ inset) outset [] timeout
461		with Unix.Unix_error(Unix.EINTR, _, _) ->
462			[], [], [] in
463		let sfds, cfds =
464			List.partition (fun fd -> List.mem fd spec_fds) rset in
465		if List.length sfds > 0 then
466			process_special_fds sfds;
467
468		if List.length cfds > 0 || List.length wset > 0 then
469			process_connection_fds store cons domains cfds wset;
470		if timeout <> 0. then (
471			let now = Unix.gettimeofday () in
472			if now > !period_start +. period_ops_interval then
473				(period_start := now; periodic_ops now)
474		);
475
476		process_domains store cons domains
477		in
478
479	Systemd.sd_notify_ready ();
480	while not !quit
481	do
482		try
483			main_loop ()
484		with exc ->
485			error "caught exception %s" (Printexc.to_string exc);
486			if cf.reraise_top_level then
487				raise exc
488	done;
489	info "stopping xenstored";
490	DB.to_file store cons (Paths.xen_run_stored ^ "/db");
491	()
492