1(* 2 * Copyright (C) 2006-2007 XenSource Ltd. 3 * Copyright (C) 2008 Citrix Ltd. 4 * Author Vincent Hanquez <vincent.hanquez@eu.citrix.com> 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU Lesser General Public License as published 8 * by the Free Software Foundation; version 2.1 only. with the special 9 * exception on linking described in file LICENSE. 10 * 11 * This program is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * GNU Lesser General Public License for more details. 15 *) 16 17exception End_of_file 18 19open Stdext 20 21let xenstore_payload_max = 4096 (* xen/include/public/io/xs_wire.h *) 22 23type 'a bounded_sender = 'a -> unit option 24(** a bounded sender accepts an ['a] item and returns: 25 None - if there is no room to accept the item 26 Some () - if it has successfully accepted/sent the item 27*) 28 29module BoundedPipe : sig 30 type 'a t 31 32 (** [create ~capacity ~destination] creates a bounded pipe with a 33 local buffer holding at most [capacity] items. Once the buffer is 34 full it will not accept further items. items from the pipe are 35 flushed into [destination] as long as it accepts items. The 36 destination could be another pipe. 37 *) 38 val create: capacity:int -> destination:'a bounded_sender -> 'a t 39 40 (** [is_empty t] returns whether the local buffer of [t] is empty. *) 41 val is_empty : _ t -> bool 42 43 (** [length t] the number of items in the internal buffer *) 44 val length: _ t -> int 45 46 (** [flush_pipe t] sends as many items from the local buffer as possible, 47 which could be none. *) 48 val flush_pipe: _ t -> unit 49 50 (** [push t item] tries to [flush_pipe] and then push [item] 51 into the pipe if its [capacity] allows. 52 Returns [None] if there is no more room 53 *) 54 val push : 'a t -> 'a bounded_sender 55end = struct 56 (* items are enqueued in [q], and then flushed to [connect_to] *) 57 type 'a t = 58 { q: 'a Queue.t 59 ; destination: 'a bounded_sender 60 ; capacity: int 61 } 62 63 let create ~capacity ~destination = 64 { q = Queue.create (); capacity; destination } 65 66 let rec flush_pipe t = 67 if not Queue.(is_empty t.q) then 68 let item = Queue.peek t.q in 69 match t.destination item with 70 | None -> () (* no room *) 71 | Some () -> 72 (* successfully sent item to next stage *) 73 let _ = Queue.pop t.q in 74 (* continue trying to send more items *) 75 flush_pipe t 76 77 let push t item = 78 (* first try to flush as many items from this pipe as possible to make room, 79 it is important to do this first to preserve the order of the items 80 *) 81 flush_pipe t; 82 if Queue.length t.q < t.capacity then begin 83 (* enqueue, instead of sending directly. 84 this ensures that [out] sees the items in the same order as we receive them 85 *) 86 Queue.push item t.q; 87 Some (flush_pipe t) 88 end else None 89 90 let is_empty t = Queue.is_empty t.q 91 let length t = Queue.length t.q 92end 93 94type watch = { 95 con: t; 96 token: string; 97 path: string; 98 base: string; 99 is_relative: bool; 100 pending_watchevents: Xenbus.Xb.Packet.t BoundedPipe.t; 101} 102 103and t = { 104 xb: Xenbus.Xb.t; 105 dom: Domain.t option; 106 transactions: (int, Transaction.t) Hashtbl.t; 107 mutable next_tid: int; 108 watches: (string, watch list) Hashtbl.t; 109 mutable nb_watches: int; 110 anonid: int; 111 mutable stat_nb_ops: int; 112 mutable perm: Perms.Connection.t; 113 pending_source_watchevents: (watch * Xenbus.Xb.Packet.t) BoundedPipe.t 114} 115 116module Watch = struct 117 module T = struct 118 type t = watch 119 120 let compare w1 w2 = 121 (* cannot compare watches from different connections *) 122 assert (w1.con == w2.con); 123 match String.compare w1.token w2.token with 124 | 0 -> String.compare w1.path w2.path 125 | n -> n 126 end 127 module Set = Set.Make(T) 128 129 let flush_events t = 130 BoundedPipe.flush_pipe t.pending_watchevents; 131 not (BoundedPipe.is_empty t.pending_watchevents) 132 133 let pending_watchevents t = 134 BoundedPipe.length t.pending_watchevents 135end 136 137let source_flush_watchevents t = 138 BoundedPipe.flush_pipe t.pending_source_watchevents 139 140let source_pending_watchevents t = 141 BoundedPipe.length t.pending_source_watchevents 142 143let mark_as_bad con = 144 match con.dom with 145 |None -> () 146 | Some domain -> Domain.mark_as_bad domain 147 148let initial_next_tid = 1 149 150let do_reconnect con = 151 Xenbus.Xb.reconnect con.xb; 152 (* dom is the same *) 153 Hashtbl.clear con.transactions; 154 con.next_tid <- initial_next_tid; 155 Hashtbl.clear con.watches; 156 (* anonid is the same *) 157 con.nb_watches <- 0; 158 con.stat_nb_ops <- 0; 159 (* perm is the same *) 160 () 161 162let get_path con = 163 Printf.sprintf "/local/domain/%i/" (match con.dom with None -> 0 | Some d -> Domain.get_id d) 164 165let watch_create ~con ~path ~token = { 166 con = con; 167 token = token; 168 path = path; 169 base = get_path con; 170 is_relative = path.[0] <> '/' && path.[0] <> '@'; 171 pending_watchevents = BoundedPipe.create ~capacity:!Define.maxwatchevents ~destination:(Xenbus.Xb.queue con.xb) 172} 173 174let get_con w = w.con 175 176let number_of_transactions con = 177 Hashtbl.length con.transactions 178 179let get_domain con = con.dom 180 181let anon_id_next = ref 1 182 183let get_domstr con = 184 match con.dom with 185 | None -> "A" ^ (string_of_int con.anonid) 186 | Some dom -> "D" ^ (string_of_int (Domain.get_id dom)) 187 188let make_perm dom = 189 let domid = 190 match dom with 191 | None -> 0 192 | Some d -> Domain.get_id d 193 in 194 Perms.Connection.create ~perms:[Perms.READ; Perms.WRITE] domid 195 196let create xbcon dom = 197 let destination (watch, pkt) = 198 BoundedPipe.push watch.pending_watchevents pkt 199 in 200 let id = 201 match dom with 202 | None -> let old = !anon_id_next in incr anon_id_next; old 203 | Some _ -> 0 204 in 205 let con = 206 { 207 xb = xbcon; 208 dom = dom; 209 transactions = Hashtbl.create 5; 210 next_tid = initial_next_tid; 211 watches = Hashtbl.create 8; 212 nb_watches = 0; 213 anonid = id; 214 stat_nb_ops = 0; 215 perm = make_perm dom; 216 217 (* the actual capacity will be lower, this is used as an overflow 218 buffer: anything that doesn't fit elsewhere gets put here, only 219 limited by the amount of watches that you can generate with a 220 single xenstore command (which is finite, although possibly very 221 large in theory for Dom0). Once the pipe here has any contents the 222 domain is blocked from sending more commands until it is empty 223 again though. 224 *) 225 pending_source_watchevents = BoundedPipe.create ~capacity:Sys.max_array_length ~destination 226 } 227 in 228 Logging.new_connection ~tid:Transaction.none ~con:(get_domstr con); 229 con 230 231let get_fd con = Xenbus.Xb.get_fd con.xb 232let close con = 233 Logging.end_connection ~tid:Transaction.none ~con:(get_domstr con); 234 Xenbus.Xb.close con.xb 235 236let get_perm con = 237 con.perm 238 239let set_target con target_domid = 240 con.perm <- Perms.Connection.set_target (get_perm con) ~perms:[Perms.READ; Perms.WRITE] target_domid 241 242let is_backend_mmap con = Xenbus.Xb.is_mmap con.xb 243 244let packet_of con tid rid ty data = 245 if (String.length data) > xenstore_payload_max && (is_backend_mmap con) then 246 Xenbus.Xb.Packet.create tid rid Xenbus.Xb.Op.Error "E2BIG\000" 247 else 248 Xenbus.Xb.Packet.create tid rid ty data 249 250let send_reply con tid rid ty data = 251 let result = Xenbus.Xb.queue con.xb (packet_of con tid rid ty data) in 252 (* should never happen: we only process an input packet when there is room for an output packet *) 253 (* and the limit for replies is different from the limit for watch events *) 254 assert (result <> None) 255 256let send_error con tid rid err = send_reply con tid rid Xenbus.Xb.Op.Error (err ^ "\000") 257let send_ack con tid rid ty = send_reply con tid rid ty "OK\000" 258 259let get_watch_path con path = 260 if path.[0] = '@' || path.[0] = '/' then 261 path 262 else 263 let rpath = get_path con in 264 rpath ^ path 265 266let get_watches (con: t) path = 267 if Hashtbl.mem con.watches path 268 then Hashtbl.find con.watches path 269 else [] 270 271let get_children_watches con path = 272 let path = path ^ "/" in 273 List.concat (Hashtbl.fold (fun p w l -> 274 if String.startswith path p then w :: l else l) con.watches []) 275 276let is_dom0 con = 277 Perms.Connection.is_dom0 (get_perm con) 278 279let add_watch con (path, apath) token = 280 if !Quota.activate && !Define.maxwatch > 0 && 281 not (is_dom0 con) && con.nb_watches > !Define.maxwatch then 282 raise Quota.Limit_reached; 283 let l = get_watches con apath in 284 if List.exists (fun w -> w.token = token) l then 285 raise Define.Already_exist; 286 let watch = watch_create ~con ~token ~path in 287 Hashtbl.replace con.watches apath (watch :: l); 288 con.nb_watches <- con.nb_watches + 1; 289 watch 290 291let del_watch con path token = 292 let apath = get_watch_path con path in 293 let ws = Hashtbl.find con.watches apath in 294 let w = List.find (fun w -> w.token = token) ws in 295 let filtered = Utils.list_remove w ws in 296 if List.length filtered > 0 then 297 Hashtbl.replace con.watches apath filtered 298 else 299 Hashtbl.remove con.watches apath; 300 con.nb_watches <- con.nb_watches - 1; 301 apath, w 302 303let del_watches con = 304 Hashtbl.reset con.watches; 305 con.nb_watches <- 0 306 307let del_transactions con = 308 Hashtbl.reset con.transactions 309 310let list_watches con = 311 let ll = Hashtbl.fold 312 (fun _ watches acc -> List.map (fun watch -> watch.path, watch.token) watches :: acc) 313 con.watches [] in 314 List.concat ll 315 316let dbg fmt = Logging.debug "connection" fmt 317let info fmt = Logging.info "connection" fmt 318 319let lookup_watch_perm path = function 320 | None -> [] 321 | Some root -> 322 try Store.Path.apply root path @@ fun parent name -> 323 Store.Node.get_perms parent :: 324 try [Store.Node.get_perms (Store.Node.find parent name)] 325 with Not_found -> [] 326 with Define.Invalid_path | Not_found -> [] 327 328let lookup_watch_perms oldroot root path = 329 lookup_watch_perm path oldroot @ lookup_watch_perm path (Some root) 330 331let fire_single_watch_unchecked source watch = 332 let data = Utils.join_by_null [watch.path; watch.token; ""] in 333 let pkt = packet_of watch.con Transaction.none 0 Xenbus.Xb.Op.Watchevent data in 334 335 match BoundedPipe.push source.pending_source_watchevents (watch, pkt) with 336 | Some () -> () (* packet queued *) 337 | None -> 338 (* a well behaved Dom0 shouldn't be able to trigger this, 339 if it happens it is likely a Dom0 bug causing runaway memory usage 340 *) 341 failwith "watch event overflow, cannot happen" 342 343let fire_single_watch source (oldroot, root) watch = 344 let abspath = get_watch_path watch.con watch.path |> Store.Path.of_string in 345 let perms = lookup_watch_perms oldroot root abspath in 346 if Perms.can_fire_watch watch.con.perm perms then 347 fire_single_watch_unchecked source watch 348 else 349 let perms = perms |> List.map (Perms.Node.to_string ~sep:" ") |> String.concat ", " in 350 let con = get_domstr watch.con in 351 Logging.watch_not_fired ~con perms (Store.Path.to_string abspath) 352 353let fire_watch source roots watch path = 354 let new_path = 355 if watch.is_relative && path.[0] = '/' 356 then begin 357 let n = String.length watch.base 358 and m = String.length path in 359 String.sub path n (m - n) 360 end else 361 path 362 in 363 fire_single_watch source roots { watch with path = new_path } 364 365(* Search for a valid unused transaction id. *) 366let rec valid_transaction_id con proposed_id = 367 (* 368 * Clip proposed_id to the range [1, 0x3ffffffe] 369 * 370 * The chosen id must not trucate when written into the uint32_t tx_id 371 * field, and needs to fit within the positive range of a 31 bit ocaml 372 * integer to function when compiled as 32bit. 373 * 374 * Oxenstored therefore supports only 1 billion open transactions. 375 *) 376 let id = if proposed_id <= 0 || proposed_id >= 0x3fffffff then 1 else proposed_id in 377 378 if Hashtbl.mem con.transactions id then ( 379 (* Outstanding transaction with this id. Try the next. *) 380 valid_transaction_id con (id + 1) 381 ) else 382 id 383 384let start_transaction con store = 385 if !Define.maxtransaction > 0 && not (is_dom0 con) 386 && Hashtbl.length con.transactions > !Define.maxtransaction then 387 raise Quota.Transaction_opened; 388 let id = valid_transaction_id con con.next_tid in 389 con.next_tid <- id + 1; 390 let ntrans = Transaction.make id store in 391 Hashtbl.add con.transactions id ntrans; 392 Logging.start_transaction ~tid:id ~con:(get_domstr con); 393 id 394 395let end_transaction con tid commit = 396 let trans = Hashtbl.find con.transactions tid in 397 Hashtbl.remove con.transactions tid; 398 Logging.end_transaction ~tid ~con:(get_domstr con); 399 match commit with 400 | None -> true 401 | Some transaction_replay_f -> 402 Transaction.commit ~con:(get_domstr con) trans || transaction_replay_f con trans 403 404let get_transaction con tid = 405 Hashtbl.find con.transactions tid 406 407let do_input con = Xenbus.Xb.input con.xb 408let has_partial_input con = Xenbus.Xb.has_partial_input con.xb 409let has_more_input con = Xenbus.Xb.has_more_input con.xb 410 411let can_input con = Xenbus.Xb.can_input con.xb && BoundedPipe.is_empty con.pending_source_watchevents 412let has_output con = Xenbus.Xb.has_output con.xb 413let has_old_output con = Xenbus.Xb.has_old_output con.xb 414let has_new_output con = Xenbus.Xb.has_new_output con.xb 415let peek_output con = Xenbus.Xb.peek_output con.xb 416let do_output con = Xenbus.Xb.output con.xb 417 418let is_bad con = match con.dom with None -> false | Some dom -> Domain.is_bad_domain dom 419 420(* oxenstored currently only dumps limited information about its state. 421 A live update is only possible if any of the state that is not dumped would be empty. 422 Compared to https://xenbits.xen.org/docs/unstable/designs/xenstore-migration.html: 423 * GLOBAL_DATA: not strictly needed, systemd is giving the socket FDs to us 424 * CONNECTION_DATA: PARTIAL 425 * for domains: PARTIAL, see Connection.dump -> Domain.dump, only if data and tdomid is empty 426 * for sockets (Dom0 toolstack): NO 427 * WATCH_DATA: OK, see Connection.dump 428 * TRANSACTION_DATA: NO 429 * NODE_DATA: OK (except for transactions), see Store.dump_fct and DB.to_channel 430 431 Also xenstored will never talk to a Domain once it is marked as bad, 432 so treat it as idle for live-update. 433 434 Restrictions below can be relaxed once xenstored learns to dump more 435 of its live state in a safe way *) 436let has_extra_connection_data con = 437 let has_in = has_partial_input con in 438 let has_out = has_output con in 439 let has_nondefault_perms = make_perm con.dom <> con.perm in 440 has_in || has_out 441 (* TODO: what about SIGTERM, should use systemd to store FDS 442 || has_socket (* dom0 sockets not * dumped yet *) *) 443 || has_nondefault_perms (* set_target not dumped yet *) 444 445let has_transaction_data con = 446 let n = number_of_transactions con in 447 dbg "%s: number of transactions = %d" (get_domstr con) n; 448 n > 0 449 450let prevents_live_update con = not (is_bad con) 451 && (has_extra_connection_data con || has_transaction_data con) 452 453let has_more_work con = 454 (has_more_input con && can_input con) || not (has_old_output con) && has_new_output con 455 456let incr_ops con = con.stat_nb_ops <- con.stat_nb_ops + 1 457 458let stats con = 459 Hashtbl.length con.watches, con.stat_nb_ops 460 461let dump con chan = 462 let id = match con.dom with 463 | Some dom -> 464 let domid = Domain.get_id dom in 465 (* dump domain *) 466 Domain.dump dom chan; 467 domid 468 | None -> 469 let fd = con |> get_fd |> Utils.FD.to_int in 470 Printf.fprintf chan "socket,%d\n" fd; 471 -fd 472 in 473 (* dump watches *) 474 List.iter (fun (path, token) -> 475 Printf.fprintf chan "watch,%d,%s,%s\n" id (Utils.hexify path) (Utils.hexify token) 476 ) (list_watches con) 477 478let debug con = 479 let domid = get_domstr con in 480 let watches = List.map (fun (path, token) -> Printf.sprintf "watch %s: %s %s\n" domid path token) (list_watches con) in 481 String.concat "" watches 482 483let decr_conflict_credit doms con = 484 match con.dom with 485 | None -> () (* It's a socket connection. We don't know which domain we're in, so treat it as if it's free to conflict *) 486 | Some dom -> Domains.decr_conflict_credit doms dom 487