1(* 2 * Copyright (C) 2006-2007 XenSource Ltd. 3 * Copyright (C) 2008 Citrix Ltd. 4 * Author Vincent Hanquez <vincent.hanquez@eu.citrix.com> 5 * Author Thomas Gazagnaire <thomas.gazagnaire@eu.citrix.com> 6 * 7 * This program is free software; you can redistribute it and/or modify 8 * it under the terms of the GNU Lesser General Public License as published 9 * by the Free Software Foundation; version 2.1 only. with the special 10 * exception on linking described in file LICENSE. 11 * 12 * This program is distributed in the hope that it will be useful, 13 * but WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15 * GNU Lesser General Public License for more details. 16 *) 17 18open Printf 19open Parse_arg 20open Stdext 21 22let error fmt = Logging.error "xenstored" fmt 23let debug fmt = Logging.debug "xenstored" fmt 24let info fmt = Logging.info "xenstored" fmt 25 26(*------------ event klass processors --------------*) 27let process_connection_fds store cons domains rset wset = 28 let try_fct fct c = 29 try 30 fct store cons domains c 31 with 32 | Unix.Unix_error(err, "write", _) -> 33 Connections.del_anonymous cons c; 34 error "closing socket connection: write error: %s" 35 (Unix.error_message err) 36 | Unix.Unix_error(err, "read", _) -> 37 Connections.del_anonymous cons c; 38 if err <> Unix.ECONNRESET then 39 error "closing socket connection: read error: %s" 40 (Unix.error_message err) 41 | Xenbus.Xb.End_of_file -> 42 Connections.del_anonymous cons c; 43 debug "closing socket connection" 44 in 45 let process_fdset_with fds fct = 46 List.iter 47 (fun fd -> 48 try try_fct fct (Connections.find cons fd) 49 with Not_found -> () 50 ) fds in 51 process_fdset_with rset Process.do_input; 52 process_fdset_with wset Process.do_output 53 54let process_domains store cons domains = 55 let do_io_domain domain = 56 if Domain.is_bad_domain domain 57 || Domain.get_io_credit domain <= 0 58 || Domain.is_paused_for_conflict domain 59 then () (* nothing to do *) 60 else ( 61 let con = Connections.find_domain cons (Domain.get_id domain) in 62 Process.do_input store cons domains con; 63 Process.do_output store cons domains con; 64 Domain.decr_io_credit domain 65 ) in 66 Domains.iter domains do_io_domain 67 68let sigusr1_handler store = 69 try 70 let channel = open_out_gen [ Open_wronly; Open_creat; Open_trunc; ] 71 0o600 (Paths.xen_run_stored ^ "/db.debug") in 72 finally (fun () -> Store.dump store channel) 73 (fun () -> close_out channel) 74 with _ -> 75 () 76 77let sighup_handler _ = 78 maybe (fun logger -> logger.Logging.restart()) !Logging.xenstored_logger; 79 maybe (fun logger -> logger.Logging.restart()) !Logging.access_logger 80 81let config_filename cf = 82 match cf.config_file with 83 | Some name -> name 84 | None -> Define.default_config_dir ^ "/oxenstored.conf" 85 86let default_pidfile = Paths.xen_run_dir ^ "/xenstored.pid" 87 88let ring_scan_interval = ref 20 89 90let parse_config filename = 91 let pidfile = ref default_pidfile in 92 let options = [ 93 ("merge-activate", Config.Set_bool Transaction.do_coalesce); 94 ("conflict-burst-limit", Config.Set_float Define.conflict_burst_limit); 95 ("conflict-max-history-seconds", Config.Set_float Define.conflict_max_history_seconds); 96 ("conflict-rate-limit-is-aggregate", Config.Set_bool Define.conflict_rate_limit_is_aggregate); 97 ("perms-activate", Config.Set_bool Perms.activate); 98 ("quota-activate", Config.Set_bool Quota.activate); 99 ("quota-maxwatch", Config.Set_int Define.maxwatch); 100 ("quota-transaction", Config.Set_int Define.maxtransaction); 101 ("quota-maxentity", Config.Set_int Quota.maxent); 102 ("quota-maxsize", Config.Set_int Quota.maxsize); 103 ("quota-maxrequests", Config.Set_int Define.maxrequests); 104 ("test-eagain", Config.Set_bool Transaction.test_eagain); 105 ("persistent", Config.Set_bool Disk.enable); 106 ("xenstored-log-file", Config.String Logging.set_xenstored_log_destination); 107 ("xenstored-log-level", Config.String 108 (fun s -> Logging.xenstored_log_level := Logging.level_of_string s)); 109 ("xenstored-log-nb-files", Config.Set_int Logging.xenstored_log_nb_files); 110 ("xenstored-log-nb-lines", Config.Set_int Logging.xenstored_log_nb_lines); 111 ("xenstored-log-nb-chars", Config.Set_int Logging.xenstored_log_nb_chars); 112 ("access-log-file", Config.String Logging.set_access_log_destination); 113 ("access-log-nb-files", Config.Set_int Logging.access_log_nb_files); 114 ("access-log-nb-lines", Config.Set_int Logging.access_log_nb_lines); 115 ("access-log-nb-chars", Config.Set_int Logging.access_log_nb_chars); 116 ("access-log-read-ops", Config.Set_bool Logging.access_log_read_ops); 117 ("access-log-transactions-ops", Config.Set_bool Logging.access_log_transaction_ops); 118 ("access-log-special-ops", Config.Set_bool Logging.access_log_special_ops); 119 ("allow-debug", Config.Set_bool Process.allow_debug); 120 ("ring-scan-interval", Config.Set_int ring_scan_interval); 121 ("pid-file", Config.Set_string pidfile); 122 ("xenstored-kva", Config.Set_string Domains.xenstored_kva); 123 ("xenstored-port", Config.Set_string Domains.xenstored_port); ] in 124 begin try Config.read filename options (fun _ _ -> raise Not_found) 125 with 126 | Config.Error err -> List.iter (fun (k, e) -> 127 match e with 128 | "unknown key" -> eprintf "config: unknown key %s\n" k 129 | _ -> eprintf "config: %s: %s\n" k e 130 ) err; 131 | Sys_error m -> eprintf "error: config: %s\n" m; 132 end; 133 !pidfile 134 135module DB = struct 136 137exception Bad_format of string 138 139let dump_format_header = "$xenstored-dump-format" 140 141let from_channel_f chan domain_f watch_f store_f = 142 let unhexify s = Utils.unhexify s in 143 let getpath s = Store.Path.of_string (Utils.unhexify s) in 144 let header = input_line chan in 145 if header <> dump_format_header then 146 raise (Bad_format "header"); 147 let quit = ref false in 148 while not !quit 149 do 150 try 151 let line = input_line chan in 152 let l = String.split ',' line in 153 try 154 match l with 155 | "dom" :: domid :: mfn :: port :: []-> 156 domain_f (int_of_string domid) 157 (Nativeint.of_string mfn) 158 (int_of_string port) 159 | "watch" :: domid :: path :: token :: [] -> 160 watch_f (int_of_string domid) 161 (unhexify path) (unhexify token) 162 | "store" :: path :: perms :: value :: [] -> 163 store_f (getpath path) 164 (Perms.Node.of_string (unhexify perms ^ "\000")) 165 (unhexify value) 166 | _ -> 167 info "restoring: ignoring unknown line: %s" line 168 with exn -> 169 info "restoring: ignoring unknown line: %s (exception: %s)" 170 line (Printexc.to_string exn); 171 () 172 with End_of_file -> 173 quit := true 174 done; 175 () 176 177let from_channel store cons doms chan = 178 (* don't let the permission get on our way, full perm ! *) 179 let op = Store.get_ops store Perms.Connection.full_rights in 180 let xc = Xenctrl.interface_open () in 181 182 let domain_f domid mfn port = 183 let ndom = 184 if domid > 0 then 185 Domains.create xc doms domid mfn port 186 else 187 Domains.create0 doms 188 in 189 Connections.add_domain cons ndom; 190 in 191 let watch_f domid path token = 192 let con = Connections.find_domain cons domid in 193 ignore (Connections.add_watch cons con path token) 194 in 195 let store_f path perms value = 196 op.Store.write path value; 197 op.Store.setperms path perms 198 in 199 finally (fun () -> from_channel_f chan domain_f watch_f store_f) 200 (fun () -> Xenctrl.interface_close xc) 201 202let from_file store cons doms file = 203 let channel = open_in file in 204 finally (fun () -> from_channel store doms cons channel) 205 (fun () -> close_in channel) 206 207let to_channel store cons chan = 208 let hexify s = Utils.hexify s in 209 210 fprintf chan "%s\n" dump_format_header; 211 212 (* dump connections related to domains; domid, mfn, eventchn port, watches *) 213 Connections.iter_domains cons (fun con -> Connection.dump con chan); 214 215 (* dump the store *) 216 Store.dump_fct store (fun path node -> 217 let name, perms, value = Store.Node.unpack node in 218 let fullpath = Store.Path.to_string (Store.Path.of_path_and_name path name) in 219 let permstr = Perms.Node.to_string perms in 220 fprintf chan "store,%s,%s,%s\n" (hexify fullpath) (hexify permstr) (hexify value) 221 ); 222 flush chan; 223 () 224 225 226let to_file store cons file = 227 let channel = open_out_gen [ Open_wronly; Open_creat; Open_trunc; ] 0o600 file in 228 finally (fun () -> to_channel store cons channel) 229 (fun () -> close_out channel) 230end 231 232let _ = 233 let cf = do_argv in 234 let pidfile = 235 if Sys.file_exists (config_filename cf) then 236 parse_config (config_filename cf) 237 else 238 default_pidfile 239 in 240 241 (try 242 Unixext.mkdir_rec (Filename.dirname pidfile) 0o755 243 with _ -> 244 () 245 ); 246 247 let rw_sock, ro_sock = 248 if cf.disable_socket then 249 None, None 250 else 251 Some (Unix.handle_unix_error Utils.create_unix_socket Define.xs_daemon_socket), 252 Some (Unix.handle_unix_error Utils.create_unix_socket Define.xs_daemon_socket_ro) 253 in 254 255 if cf.daemonize then 256 Unixext.daemonize () 257 else 258 printf "Xen Storage Daemon, version %d.%d\n%!" 259 Define.xenstored_major Define.xenstored_minor; 260 261 (try Unixext.pidfile_write pidfile with _ -> ()); 262 263 (* for compatilibity with old xenstored *) 264 begin match cf.pidfile with 265 | Some pidfile -> Unixext.pidfile_write pidfile 266 | None -> () end; 267 268 let store = Store.create () in 269 let eventchn = Event.init () in 270 let next_frequent_ops = ref 0. in 271 let advance_next_frequent_ops () = 272 next_frequent_ops := (Unix.gettimeofday () +. !Define.conflict_max_history_seconds) 273 in 274 let delay_next_frequent_ops_by duration = 275 next_frequent_ops := !next_frequent_ops +. duration 276 in 277 let domains = Domains.init eventchn advance_next_frequent_ops in 278 279 (* For things that need to be done periodically but more often 280 * than the periodic_ops function *) 281 let frequent_ops () = 282 if Unix.gettimeofday () > !next_frequent_ops then ( 283 History.trim (); 284 Domains.incr_conflict_credit domains; 285 advance_next_frequent_ops () 286 ) in 287 let cons = Connections.create () in 288 289 let quit = ref false in 290 291 Logging.init_xenstored_log(); 292 293 let filename = Paths.xen_run_stored ^ "/db" in 294 if cf.restart && Sys.file_exists filename then ( 295 DB.from_file store domains cons filename; 296 Event.bind_dom_exc_virq eventchn 297 ) else ( 298 if !Disk.enable then ( 299 info "reading store from disk"; 300 Disk.read store 301 ); 302 303 let localpath = Store.Path.of_string "/local" in 304 if not (Store.path_exists store localpath) then 305 Store.mkdir store (Perms.Connection.create 0) localpath; 306 307 if cf.domain_init then ( 308 Connections.add_domain cons (Domains.create0 domains); 309 Event.bind_dom_exc_virq eventchn 310 ); 311 ); 312 313 Select.use_poll (not cf.use_select); 314 315 Sys.set_signal Sys.sighup (Sys.Signal_handle sighup_handler); 316 Sys.set_signal Sys.sigterm (Sys.Signal_handle (fun i -> quit := true)); 317 Sys.set_signal Sys.sigusr1 (Sys.Signal_handle (fun i -> sigusr1_handler store)); 318 Sys.set_signal Sys.sigpipe Sys.Signal_ignore; 319 320 if cf.activate_access_log then begin 321 let post_rotate () = DB.to_file store cons (Paths.xen_run_stored ^ "/db") in 322 Logging.init_access_log post_rotate 323 end; 324 325 let spec_fds = 326 (match rw_sock with None -> [] | Some x -> [ x ]) @ 327 (match ro_sock with None -> [] | Some x -> [ x ]) @ 328 (if cf.domain_init then [ Event.fd eventchn ] else []) 329 in 330 331 let xc = Xenctrl.interface_open () in 332 333 let process_special_fds rset = 334 let accept_connection can_write fd = 335 let (cfd, addr) = Unix.accept fd in 336 debug "new connection through socket"; 337 Connections.add_anonymous cons cfd can_write 338 and handle_eventchn fd = 339 let port = Event.pending eventchn in 340 debug "pending port %d" (Xeneventchn.to_int port); 341 finally (fun () -> 342 if Some port = eventchn.Event.virq_port then ( 343 let (notify, deaddom) = Domains.cleanup xc domains in 344 List.iter (Connections.del_domain cons) deaddom; 345 if deaddom <> [] || notify then 346 Connections.fire_spec_watches cons "@releaseDomain" 347 ) 348 else 349 let c = Connections.find_domain_by_port cons port in 350 match Connection.get_domain c with 351 | Some dom -> Domain.incr_io_credit dom | None -> () 352 ) (fun () -> Event.unmask eventchn port) 353 and do_if_set fd set fct = 354 if List.mem fd set then 355 fct fd in 356 357 maybe (fun fd -> do_if_set fd rset (accept_connection true)) rw_sock; 358 maybe (fun fd -> do_if_set fd rset (accept_connection false)) ro_sock; 359 do_if_set (Event.fd eventchn) rset (handle_eventchn) 360 in 361 362 let ring_scan_checker dom = 363 (* no need to scan domains already marked as for processing *) 364 if not (Domain.get_io_credit dom > 0) then 365 let con = Connections.find_domain cons (Domain.get_id dom) in 366 if not (Connection.has_more_work con) then ( 367 Process.do_output store cons domains con; 368 Process.do_input store cons domains con; 369 if Connection.has_more_work con then 370 (* Previously thought as no work, but detect some after scan (as 371 processing a new message involves multiple steps.) It's very 372 likely to be a "lazy" client, bump its credit. It could be false 373 positive though (due to time window), but it's no harm to give a 374 domain extra credit. *) 375 let n = 32 + 2 * (Domains.number domains) in 376 info "found lazy domain %d, credit %d" (Domain.get_id dom) n; 377 Domain.set_io_credit ~n dom 378 ) in 379 380 let last_stat_time = ref 0. in 381 let last_scan_time = ref 0. in 382 383 let periodic_ops now = 384 debug "periodic_ops starting"; 385 (* we garbage collect the string->int dictionary after a sizeable amount of operations, 386 * there's no need to be really fast even if we got loose 387 * objects since names are often reuse. 388 *) 389 if Symbol.created () > 1000 || Symbol.used () > 20000 390 then begin 391 Symbol.mark_all_as_unused (); 392 Store.mark_symbols store; 393 Connections.iter cons Connection.mark_symbols; 394 History.mark_symbols (); 395 Symbol.garbage () 396 end; 397 398 (* scan all the xs rings as a safenet for ill-behaved clients *) 399 if !ring_scan_interval >= 0 && now > (!last_scan_time +. float !ring_scan_interval) then 400 (last_scan_time := now; Domains.iter domains ring_scan_checker); 401 402 (* make sure we don't print general stats faster than 2 min *) 403 if now > (!last_stat_time +. 120.) then ( 404 info "Transaction conflict statistics for last %F seconds:" (now -. !last_stat_time); 405 last_stat_time := now; 406 Domains.iter domains (Domain.log_and_reset_conflict_stats (info "Dom%d caused %Ld conflicts")); 407 info "%Ld failed transactions; of these no culprit was found for %Ld" !Transaction.failed_commits !Transaction.failed_commits_no_culprit; 408 Transaction.reset_conflict_stats (); 409 410 let gc = Gc.stat () in 411 let (lanon, lanon_ops, lanon_watchs, 412 ldom, ldom_ops, ldom_watchs) = Connections.stats cons in 413 let store_nodes, store_abort, store_coalesce = Store.stats store in 414 let symtbl_len = Symbol.stats () in 415 416 info "store stat: nodes(%d) t-abort(%d) t-coalesce(%d)" 417 store_nodes store_abort store_coalesce; 418 info "sytbl stat: %d" symtbl_len; 419 info " con stat: anonymous(%d, %d o, %d w) domains(%d, %d o, %d w)" 420 lanon lanon_ops lanon_watchs ldom ldom_ops ldom_watchs; 421 info " mem stat: minor(%.0f) promoted(%.0f) major(%.0f) heap(%d w, %d c) live(%d w, %d b) free(%d w, %d b)" 422 gc.Gc.minor_words gc.Gc.promoted_words gc.Gc.major_words 423 gc.Gc.heap_words gc.Gc.heap_chunks 424 gc.Gc.live_words gc.Gc.live_blocks 425 gc.Gc.free_words gc.Gc.free_blocks 426 ); 427 let elapsed = Unix.gettimeofday () -. now in 428 debug "periodic_ops took %F seconds." elapsed; 429 delay_next_frequent_ops_by elapsed 430 in 431 432 let period_ops_interval = 15. in 433 let period_start = ref 0. in 434 435 let main_loop () = 436 let is_peaceful c = 437 match Connection.get_domain c with 438 | None -> true (* Treat socket-connections as exempt, and free to conflict. *) 439 | Some dom -> not (Domain.is_paused_for_conflict dom) 440 in 441 frequent_ops (); 442 let mw = Connections.has_more_work cons in 443 let peaceful_mw = List.filter is_peaceful mw in 444 List.iter 445 (fun c -> 446 match Connection.get_domain c with 447 | None -> () | Some d -> Domain.incr_io_credit d) 448 peaceful_mw; 449 let start_time = Unix.gettimeofday () in 450 let timeout = 451 let until_next_activity = 452 if Domains.all_at_max_credit domains 453 then period_ops_interval 454 else min (max 0. (!next_frequent_ops -. start_time)) period_ops_interval in 455 if peaceful_mw <> [] then 0. else until_next_activity 456 in 457 let inset, outset = Connections.select ~only_if:is_peaceful cons in 458 let rset, wset, _ = 459 try 460 Select.select (spec_fds @ inset) outset [] timeout 461 with Unix.Unix_error(Unix.EINTR, _, _) -> 462 [], [], [] in 463 let sfds, cfds = 464 List.partition (fun fd -> List.mem fd spec_fds) rset in 465 if List.length sfds > 0 then 466 process_special_fds sfds; 467 468 if List.length cfds > 0 || List.length wset > 0 then 469 process_connection_fds store cons domains cfds wset; 470 if timeout <> 0. then ( 471 let now = Unix.gettimeofday () in 472 if now > !period_start +. period_ops_interval then 473 (period_start := now; periodic_ops now) 474 ); 475 476 process_domains store cons domains 477 in 478 479 Systemd.sd_notify_ready (); 480 while not !quit 481 do 482 try 483 main_loop () 484 with exc -> 485 error "caught exception %s" (Printexc.to_string exc); 486 if cf.reraise_top_level then 487 raise exc 488 done; 489 info "stopping xenstored"; 490 DB.to_file store cons (Paths.xen_run_stored ^ "/db"); 491 () 492