1(*
2 * Copyright (C) 2006-2007 XenSource Ltd.
3 * Copyright (C) 2008      Citrix Ltd.
4 * Author Vincent Hanquez <vincent.hanquez@eu.citrix.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU Lesser General Public License as published
8 * by the Free Software Foundation; version 2.1 only. with the special
9 * exception on linking described in file LICENSE.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 * GNU Lesser General Public License for more details.
15 *)
16
17exception End_of_file
18
19open Stdext
20
21let xenstore_payload_max = 4096 (* xen/include/public/io/xs_wire.h *)
22
23type 'a bounded_sender = 'a -> unit option
24(** a bounded sender accepts an ['a] item and returns:
25    None - if there is no room to accept the item
26    Some () -  if it has successfully accepted/sent the item
27*)
28
29module BoundedPipe : sig
30  type 'a t
31
32  (** [create ~capacity ~destination] creates a bounded pipe with a
33      	    local buffer holding at most [capacity] items.  Once the buffer is
34      	    full it will not accept further items.  items from the pipe are
35      	    flushed into [destination] as long as it accepts items.  The
36      	    destination could be another pipe.
37      	 *)
38  val create: capacity:int -> destination:'a bounded_sender -> 'a t
39
40  (** [is_empty t] returns whether the local buffer of [t] is empty. *)
41  val is_empty : _ t -> bool
42
43  (** [length t] the number of items in the internal buffer *)
44  val length: _ t -> int
45
46  (** [flush_pipe t] sends as many items from the local buffer as possible,
47      			which could be none. *)
48  val flush_pipe: _ t -> unit
49
50  (** [push t item] tries to [flush_pipe] and then push [item]
51      	    into the pipe if its [capacity] allows.
52      	    Returns [None] if there is no more room
53      	 *)
54  val push : 'a t -> 'a bounded_sender
55end = struct
56  (* items are enqueued in [q], and then flushed to [connect_to] *)
57  type 'a t =
58    { q: 'a Queue.t
59    ; destination: 'a bounded_sender
60    ; capacity: int
61    }
62
63  let create ~capacity ~destination =
64    { q = Queue.create (); capacity; destination }
65
66  let rec flush_pipe t =
67    if not Queue.(is_empty t.q) then
68      let item = Queue.peek t.q in
69      match t.destination item with
70      | None -> () (* no room *)
71      | Some () ->
72        (* successfully sent item to next stage *)
73        let _ = Queue.pop t.q in
74        (* continue trying to send more items *)
75        flush_pipe t
76
77  let push t item =
78    (* first try to flush as many items from this pipe as possible to make room,
79       		   it is important to do this first to preserve the order of the items
80       		 *)
81    flush_pipe t;
82    if Queue.length t.q < t.capacity then begin
83      (* enqueue, instead of sending directly.
84         			   this ensures that [out] sees the items in the same order as we receive them
85         			 *)
86      Queue.push item t.q;
87      Some (flush_pipe t)
88    end else None
89
90  let is_empty t = Queue.is_empty t.q
91  let length t = Queue.length t.q
92end
93
94type watch = {
95  con: t;
96  token: string;
97  path: string;
98  base: string;
99  is_relative: bool;
100  pending_watchevents: Xenbus.Xb.Packet.t BoundedPipe.t;
101}
102
103and t = {
104  xb: Xenbus.Xb.t;
105  dom: Domain.t option;
106  transactions: (int, Transaction.t) Hashtbl.t;
107  mutable next_tid: int;
108  watches: (string, watch list) Hashtbl.t;
109  mutable nb_watches: int;
110  anonid: int;
111  mutable stat_nb_ops: int;
112  mutable perm: Perms.Connection.t;
113  pending_source_watchevents: (watch * Xenbus.Xb.Packet.t) BoundedPipe.t
114}
115
116module Watch = struct
117  module T = struct
118    type t = watch
119
120    let compare w1 w2 =
121      (* cannot compare watches from different connections *)
122      assert (w1.con == w2.con);
123      match String.compare w1.token w2.token with
124      | 0 -> String.compare w1.path w2.path
125      | n -> n
126  end
127  module Set = Set.Make(T)
128
129  let flush_events t =
130    BoundedPipe.flush_pipe t.pending_watchevents;
131    not (BoundedPipe.is_empty t.pending_watchevents)
132
133  let pending_watchevents t =
134    BoundedPipe.length t.pending_watchevents
135end
136
137let source_flush_watchevents t =
138  BoundedPipe.flush_pipe t.pending_source_watchevents
139
140let source_pending_watchevents t =
141  BoundedPipe.length t.pending_source_watchevents
142
143let mark_as_bad con =
144  match con.dom with
145  |None -> ()
146  | Some domain -> Domain.mark_as_bad domain
147
148let initial_next_tid = 1
149
150let do_reconnect con =
151  Xenbus.Xb.reconnect con.xb;
152  (* dom is the same *)
153  Hashtbl.clear con.transactions;
154  con.next_tid <- initial_next_tid;
155  Hashtbl.clear con.watches;
156  (* anonid is the same *)
157  con.nb_watches <- 0;
158  con.stat_nb_ops <- 0;
159  (* perm is the same *)
160  ()
161
162let get_path con =
163  Printf.sprintf "/local/domain/%i/" (match con.dom with None -> 0 | Some d -> Domain.get_id d)
164
165let watch_create ~con ~path ~token = {
166  con = con;
167  token = token;
168  path = path;
169  base = get_path con;
170  is_relative = path.[0] <> '/' && path.[0] <> '@';
171  pending_watchevents = BoundedPipe.create ~capacity:!Define.maxwatchevents ~destination:(Xenbus.Xb.queue con.xb)
172}
173
174let get_con w = w.con
175
176let number_of_transactions con =
177  Hashtbl.length con.transactions
178
179let get_domain con = con.dom
180
181let anon_id_next = ref 1
182
183let get_domstr con =
184  match con.dom with
185  | None     -> "A" ^ (string_of_int con.anonid)
186  | Some dom -> "D" ^ (string_of_int (Domain.get_id dom))
187
188let make_perm dom =
189  let domid =
190    match dom with
191    | None   -> 0
192    | Some d -> Domain.get_id d
193  in
194  Perms.Connection.create ~perms:[Perms.READ; Perms.WRITE] domid
195
196let create xbcon dom =
197  let destination (watch, pkt) =
198    BoundedPipe.push watch.pending_watchevents pkt
199  in
200  let id =
201    match dom with
202    | None -> let old = !anon_id_next in incr anon_id_next; old
203    | Some _ -> 0
204  in
205  let con =
206    {
207      xb = xbcon;
208      dom = dom;
209      transactions = Hashtbl.create 5;
210      next_tid = initial_next_tid;
211      watches = Hashtbl.create 8;
212      nb_watches = 0;
213      anonid = id;
214      stat_nb_ops = 0;
215      perm = make_perm dom;
216
217      (* the actual capacity will be lower, this is used as an overflow
218         	   buffer: anything that doesn't fit elsewhere gets put here, only
219         	   limited by the amount of watches that you can generate with a
220         	   single xenstore command (which is finite, although possibly very
221         	   large in theory for Dom0).  Once the pipe here has any contents the
222         	   domain is blocked from sending more commands until it is empty
223         	   again though.
224         	 *)
225      pending_source_watchevents = BoundedPipe.create ~capacity:Sys.max_array_length ~destination
226    }
227  in
228  Logging.new_connection ~tid:Transaction.none ~con:(get_domstr con);
229  con
230
231let get_fd con = Xenbus.Xb.get_fd con.xb
232let close con =
233  Logging.end_connection ~tid:Transaction.none ~con:(get_domstr con);
234  Xenbus.Xb.close con.xb
235
236let get_perm con =
237  con.perm
238
239let set_target con target_domid =
240  con.perm <- Perms.Connection.set_target (get_perm con) ~perms:[Perms.READ; Perms.WRITE] target_domid
241
242let is_backend_mmap con = Xenbus.Xb.is_mmap con.xb
243
244let packet_of con tid rid ty data =
245  if (String.length data) > xenstore_payload_max && (is_backend_mmap con) then
246    Xenbus.Xb.Packet.create tid rid Xenbus.Xb.Op.Error "E2BIG\000"
247  else
248    Xenbus.Xb.Packet.create tid rid ty data
249
250let send_reply con tid rid ty data =
251  let result = Xenbus.Xb.queue con.xb (packet_of con tid rid ty data) in
252  (* should never happen: we only process an input packet when there is room for an output packet *)
253  (* and the limit for replies is different from the limit for watch events *)
254  assert (result <> None)
255
256let send_error con tid rid err = send_reply con tid rid Xenbus.Xb.Op.Error (err ^ "\000")
257let send_ack con tid rid ty = send_reply con tid rid ty "OK\000"
258
259let get_watch_path con path =
260  if path.[0] = '@' || path.[0] = '/' then
261    path
262  else
263    let rpath = get_path con in
264    rpath ^ path
265
266let get_watches (con: t) path =
267  if Hashtbl.mem con.watches path
268  then Hashtbl.find con.watches path
269  else []
270
271let get_children_watches con path =
272  let path = path ^ "/" in
273  List.concat (Hashtbl.fold (fun p w l ->
274      if String.startswith path p then w :: l else l) con.watches [])
275
276let is_dom0 con =
277  Perms.Connection.is_dom0 (get_perm con)
278
279let add_watch con (path, apath) token =
280  if !Quota.activate && !Define.maxwatch > 0 &&
281     not (is_dom0 con) && con.nb_watches > !Define.maxwatch then
282    raise Quota.Limit_reached;
283  let l = get_watches con apath in
284  if List.exists (fun w -> w.token = token) l then
285    raise Define.Already_exist;
286  let watch = watch_create ~con ~token ~path in
287  Hashtbl.replace con.watches apath (watch :: l);
288  con.nb_watches <- con.nb_watches + 1;
289  watch
290
291let del_watch con path token =
292  let apath = get_watch_path con path in
293  let ws = Hashtbl.find con.watches apath in
294  let w = List.find (fun w -> w.token = token) ws in
295  let filtered = Utils.list_remove w ws in
296  if List.length filtered > 0 then
297    Hashtbl.replace con.watches apath filtered
298  else
299    Hashtbl.remove con.watches apath;
300  con.nb_watches <- con.nb_watches - 1;
301  apath, w
302
303let del_watches con =
304  Hashtbl.reset con.watches;
305  con.nb_watches <- 0
306
307let del_transactions con =
308  Hashtbl.reset con.transactions
309
310let list_watches con =
311  let ll = Hashtbl.fold
312      (fun _ watches acc -> List.map (fun watch -> watch.path, watch.token) watches :: acc)
313      con.watches [] in
314  List.concat ll
315
316let dbg fmt = Logging.debug "connection" fmt
317let info fmt = Logging.info "connection" fmt
318
319let lookup_watch_perm path = function
320  | None -> []
321  | Some root ->
322    try Store.Path.apply root path @@ fun parent name ->
323      Store.Node.get_perms parent ::
324      try [Store.Node.get_perms (Store.Node.find parent name)]
325      with Not_found -> []
326    with Define.Invalid_path | Not_found -> []
327
328let lookup_watch_perms oldroot root path =
329  lookup_watch_perm path oldroot @ lookup_watch_perm path (Some root)
330
331let fire_single_watch_unchecked source watch =
332  let data = Utils.join_by_null [watch.path; watch.token; ""] in
333  let pkt = packet_of watch.con Transaction.none 0 Xenbus.Xb.Op.Watchevent data in
334
335  match BoundedPipe.push source.pending_source_watchevents (watch, pkt) with
336  | Some () -> () (* packet queued *)
337  | None ->
338    (* a well behaved Dom0 shouldn't be able to trigger this,
339       			   if it happens it is likely a Dom0 bug causing runaway memory usage
340       			 *)
341    failwith "watch event overflow, cannot happen"
342
343let fire_single_watch source (oldroot, root) watch =
344  let abspath = get_watch_path watch.con watch.path |> Store.Path.of_string in
345  let perms = lookup_watch_perms oldroot root abspath in
346  if Perms.can_fire_watch watch.con.perm perms then
347    fire_single_watch_unchecked source watch
348  else
349    let perms = perms |> List.map (Perms.Node.to_string ~sep:" ") |> String.concat ", " in
350    let con = get_domstr watch.con in
351    Logging.watch_not_fired ~con perms (Store.Path.to_string abspath)
352
353let fire_watch source roots watch path =
354  let new_path =
355    if watch.is_relative && path.[0] = '/'
356    then begin
357      let n = String.length watch.base
358      and m = String.length path in
359      String.sub path n (m - n)
360    end else
361      path
362  in
363  fire_single_watch source roots { watch with path = new_path }
364
365(* Search for a valid unused transaction id. *)
366let rec valid_transaction_id con proposed_id =
367 (*
368	 * Clip proposed_id to the range [1, 0x3ffffffe]
369	 *
370	 * The chosen id must not trucate when written into the uint32_t tx_id
371	 * field, and needs to fit within the positive range of a 31 bit ocaml
372	 * integer to function when compiled as 32bit.
373	 *
374	 * Oxenstored therefore supports only 1 billion open transactions.
375	 *)
376  let id = if proposed_id <= 0 || proposed_id >= 0x3fffffff then 1 else proposed_id in
377
378  if Hashtbl.mem con.transactions id then (
379    (* Outstanding transaction with this id.  Try the next. *)
380    valid_transaction_id con (id + 1)
381  ) else
382    id
383
384let start_transaction con store =
385  if !Define.maxtransaction > 0 && not (is_dom0 con)
386     && Hashtbl.length con.transactions > !Define.maxtransaction then
387    raise Quota.Transaction_opened;
388  let id = valid_transaction_id con con.next_tid in
389  con.next_tid <- id + 1;
390  let ntrans = Transaction.make id store in
391  Hashtbl.add con.transactions id ntrans;
392  Logging.start_transaction ~tid:id ~con:(get_domstr con);
393  id
394
395let end_transaction con tid commit =
396  let trans = Hashtbl.find con.transactions tid in
397  Hashtbl.remove con.transactions tid;
398  Logging.end_transaction ~tid ~con:(get_domstr con);
399  match commit with
400  | None -> true
401  | Some transaction_replay_f ->
402    Transaction.commit ~con:(get_domstr con) trans || transaction_replay_f con trans
403
404let get_transaction con tid =
405  Hashtbl.find con.transactions tid
406
407let do_input con = Xenbus.Xb.input con.xb
408let has_partial_input con = Xenbus.Xb.has_partial_input con.xb
409let has_more_input con = Xenbus.Xb.has_more_input con.xb
410
411let can_input con = Xenbus.Xb.can_input con.xb && BoundedPipe.is_empty con.pending_source_watchevents
412let has_output con = Xenbus.Xb.has_output con.xb
413let has_old_output con = Xenbus.Xb.has_old_output con.xb
414let has_new_output con = Xenbus.Xb.has_new_output con.xb
415let peek_output con = Xenbus.Xb.peek_output con.xb
416let do_output con = Xenbus.Xb.output con.xb
417
418let is_bad con = match con.dom with None -> false | Some dom -> Domain.is_bad_domain dom
419
420(* oxenstored currently only dumps limited information about its state.
421   A live update is only possible if any of the state that is not dumped would be empty.
422   Compared to https://xenbits.xen.org/docs/unstable/designs/xenstore-migration.html:
423     * GLOBAL_DATA: not strictly needed, systemd is giving the socket FDs to us
424     * CONNECTION_DATA: PARTIAL
425       * for domains: PARTIAL, see Connection.dump -> Domain.dump, only if data and tdomid is empty
426       * for sockets (Dom0 toolstack): NO
427     * WATCH_DATA: OK, see Connection.dump
428     * TRANSACTION_DATA: NO
429     * NODE_DATA: OK (except for transactions), see Store.dump_fct and DB.to_channel
430
431   Also xenstored will never talk to a Domain once it is marked as bad,
432   so treat it as idle for live-update.
433
434   Restrictions below can be relaxed once xenstored learns to dump more
435   of its live state in a safe way *)
436let has_extra_connection_data con =
437  let has_in = has_partial_input con in
438  let has_out = has_output con in
439  let has_nondefault_perms = make_perm con.dom <> con.perm in
440  has_in || has_out
441  (* TODO: what about SIGTERM, should use systemd to store FDS
442     	|| has_socket (* dom0 sockets not * dumped yet *) *)
443  || has_nondefault_perms (* set_target not dumped yet *)
444
445let has_transaction_data con =
446  let n = number_of_transactions con in
447  dbg "%s: number of transactions = %d" (get_domstr con) n;
448  n > 0
449
450let prevents_live_update con = not (is_bad con)
451                               && (has_extra_connection_data con || has_transaction_data con)
452
453let has_more_work con =
454  (has_more_input con && can_input con) || not (has_old_output con) && has_new_output con
455
456let incr_ops con = con.stat_nb_ops <- con.stat_nb_ops + 1
457
458let stats con =
459  Hashtbl.length con.watches, con.stat_nb_ops
460
461let dump con chan =
462  let id = match con.dom with
463    | Some dom ->
464      let domid = Domain.get_id dom in
465      (* dump domain *)
466      Domain.dump dom chan;
467      domid
468    | None ->
469      let fd = con |> get_fd |> Utils.FD.to_int in
470      Printf.fprintf chan "socket,%d\n" fd;
471      -fd
472  in
473  (* dump watches *)
474  List.iter (fun (path, token) ->
475      Printf.fprintf chan "watch,%d,%s,%s\n" id (Utils.hexify path) (Utils.hexify token)
476    ) (list_watches con)
477
478let debug con =
479  let domid = get_domstr con in
480  let watches = List.map (fun (path, token) -> Printf.sprintf "watch %s: %s %s\n" domid path token) (list_watches con) in
481  String.concat "" watches
482
483let decr_conflict_credit doms con =
484  match con.dom with
485  | None -> () (* It's a socket connection. We don't know which domain we're in, so treat it as if it's free to conflict *)
486  | Some dom -> Domains.decr_conflict_credit doms dom
487