1(*
2 * Copyright (C) 2006-2007 XenSource Ltd.
3 * Copyright (C) 2008      Citrix Ltd.
4 * Author Vincent Hanquez <vincent.hanquez@eu.citrix.com>
5 * Author Thomas Gazagnaire <thomas.gazagnaire@eu.citrix.com>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU Lesser General Public License as published
9 * by the Free Software Foundation; version 2.1 only. with the special
10 * exception on linking described in file LICENSE.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU Lesser General Public License for more details.
16 *)
17
18let debug fmt = Logging.debug "connections" fmt
19
20type t = {
21  anonymous: (Unix.file_descr, Connection.t) Hashtbl.t;
22  domains: (int, Connection.t) Hashtbl.t;
23  ports: (Xeneventchn.t, Connection.t) Hashtbl.t;
24  mutable watches: Connection.watch list Trie.t;
25  mutable has_pending_watchevents: Connection.Watch.Set.t
26}
27
28let create () = {
29  anonymous = Hashtbl.create 37;
30  domains = Hashtbl.create 37;
31  ports = Hashtbl.create 37;
32  watches = Trie.create ();
33  has_pending_watchevents = Connection.Watch.Set.empty;
34}
35
36let get_capacity () =
37  (* not multiplied by maxwatch on purpose: 2nd queue in watch itself! *)
38  { Xenbus.Xb.maxoutstanding = !Define.maxoutstanding; maxwatchevents = !Define.maxwatchevents }
39
40let add_anonymous cons fd =
41  let capacity = get_capacity () in
42  let xbcon = Xenbus.Xb.open_fd fd ~capacity in
43  let con = Connection.create xbcon None in
44  Hashtbl.add cons.anonymous (Xenbus.Xb.get_fd xbcon) con
45
46let add_domain cons dom =
47  let capacity = get_capacity () in
48  let xbcon = Xenbus.Xb.open_mmap ~capacity (Domain.get_interface dom) (fun () -> Domain.notify dom) in
49  let con = Connection.create xbcon (Some dom) in
50  Hashtbl.add cons.domains (Domain.get_id dom) con;
51  Hashtbl.add cons.ports (Domain.get_local_port dom) con
52
53let select ?(only_if = (fun _ -> true)) cons =
54  Hashtbl.fold (fun _ con (ins, outs) ->
55      if (only_if con) then (
56        let fd = Connection.get_fd con in
57        let in_fds = if Connection.can_input con then fd :: ins else ins in
58        let out_fds = if Connection.has_output con then fd :: outs else outs in
59        in_fds, out_fds
60      ) else (ins, outs)
61    )
62    cons.anonymous ([], [])
63
64let find cons =
65  Hashtbl.find cons.anonymous
66
67let find_domain cons =
68  Hashtbl.find cons.domains
69
70let find_domain_by_port cons port =
71  Hashtbl.find cons.ports port
72
73let del_watches_of_con con watches =
74  match List.filter (fun w -> Connection.get_con w != con) watches with
75  | [] -> None
76  | ws -> Some ws
77
78let del_watches cons con =
79  Connection.del_watches con;
80  cons.watches <- Trie.map (del_watches_of_con con) cons.watches;
81  cons.has_pending_watchevents <-
82    cons.has_pending_watchevents |> Connection.Watch.Set.filter @@ fun w ->
83    Connection.get_con w != con
84
85let del_anonymous cons con =
86  try
87    Hashtbl.remove cons.anonymous (Connection.get_fd con);
88    del_watches cons con;
89    Connection.close con
90  with exn ->
91    debug "del anonymous %s" (Printexc.to_string exn)
92
93let del_domain cons id =
94  try
95    let con = find_domain cons id in
96    Hashtbl.remove cons.domains id;
97    (match Connection.get_domain con with
98     | Some d -> Hashtbl.remove cons.ports (Domain.get_local_port d)
99     | None -> ());
100    del_watches cons con;
101    Connection.close con
102  with exn ->
103    debug "del domain %u: %s" id (Printexc.to_string exn)
104
105let iter_domains cons fct =
106  Hashtbl.iter (fun _ c -> fct c) cons.domains
107
108let iter_anonymous cons fct =
109  Hashtbl.iter (fun _ c -> fct c) cons.anonymous
110
111let iter cons fct =
112  iter_domains cons fct; iter_anonymous cons fct
113
114let has_more_work cons =
115  Hashtbl.fold
116    (fun _id con acc ->
117       if Connection.has_more_work con then con :: acc else acc)
118    cons.domains []
119
120let key_of_str path =
121  if path.[0] = '@'
122  then [path]
123  else "" :: Store.Path.to_string_list (Store.Path.of_string path)
124
125let key_of_path path =
126  "" :: Store.Path.to_string_list path
127
128let add_watch cons con path token =
129  let apath = Connection.get_watch_path con path in
130  (* fail on invalid paths early by calling key_of_str before adding watch *)
131  let key = key_of_str apath in
132  let watch = Connection.add_watch con (path, apath) token in
133  let watches =
134    if Trie.mem cons.watches key
135    then Trie.find cons.watches key
136    else []
137  in
138  cons.watches <- Trie.set cons.watches key (watch :: watches);
139  watch
140
141let del_watch cons con path token =
142  let apath, watch = Connection.del_watch con path token in
143  let key = key_of_str apath in
144  let watches = Utils.list_remove watch (Trie.find cons.watches key) in
145  if watches = [] then
146    cons.watches <- Trie.unset cons.watches key
147  else
148    cons.watches <- Trie.set cons.watches key watches;
149  watch
150
151(* path is absolute *)
152let fire_watches ?oldroot source root cons path recurse =
153  let key = key_of_path path in
154  let path = Store.Path.to_string path in
155  let roots = oldroot, root in
156  let fire_watch _ = function
157    | None         -> ()
158    | Some watches -> List.iter (fun w -> Connection.fire_watch source roots w path) watches
159  in
160  let fire_rec _x = function
161    | None         -> ()
162    | Some watches ->
163      List.iter (Connection.fire_single_watch source roots) watches
164  in
165  Trie.iter_path fire_watch cons.watches key;
166  if recurse then
167    Trie.iter fire_rec (Trie.sub cons.watches key)
168
169let send_watchevents cons con =
170  cons.has_pending_watchevents <-
171    cons.has_pending_watchevents |> Connection.Watch.Set.filter Connection.Watch.flush_events;
172  Connection.source_flush_watchevents con
173
174let fire_spec_watches root cons specpath =
175  let source = find_domain cons 0 in
176  iter cons (fun con ->
177      List.iter (Connection.fire_single_watch source (None, root)) (Connection.get_watches con specpath))
178
179let set_target cons domain target_domain =
180  let con = find_domain cons domain in
181  Connection.set_target con target_domain
182
183let number_of_transactions cons =
184  let res = ref 0 in
185  let aux con =
186    res := Connection.number_of_transactions con + !res
187  in
188  iter cons aux;
189  !res
190
191let stats cons =
192  let nb_ops_anon = ref 0
193  and nb_watchs_anon = ref 0
194  and nb_ops_dom = ref 0
195  and nb_watchs_dom = ref 0 in
196  iter_anonymous cons (fun con ->
197      let con_watchs, con_ops = Connection.stats con in
198      nb_ops_anon := !nb_ops_anon + con_ops;
199      nb_watchs_anon := !nb_watchs_anon + con_watchs;
200    );
201  iter_domains cons (fun con ->
202      let con_watchs, con_ops = Connection.stats con in
203      nb_ops_dom := !nb_ops_dom + con_ops;
204      nb_watchs_dom := !nb_watchs_dom + con_watchs;
205    );
206  (Hashtbl.length cons.anonymous, !nb_ops_anon, !nb_watchs_anon,
207   Hashtbl.length cons.domains, !nb_ops_dom, !nb_watchs_dom)
208
209let debug cons =
210  let anonymous = Hashtbl.fold (fun _ con accu -> Connection.debug con :: accu) cons.anonymous [] in
211  let domains = Hashtbl.fold (fun _ con accu -> Connection.debug con :: accu) cons.domains [] in
212  String.concat "" (domains @ anonymous)
213
214let debug_watchevents cons con =
215  (* == (physical equality)
216     	   has to be used here because w.con.xb.backend might contain a [unit->unit] value causing regular
217     	   comparison to fail due to having a 'functional value' which cannot be compared.
218     	 *)
219  let s = cons.has_pending_watchevents |> Connection.Watch.Set.filter (fun w -> w.con == con) in
220  let pending = s |> Connection.Watch.Set.elements
221                |> List.map (fun w -> Connection.Watch.pending_watchevents w) |> List.fold_left (+) 0 in
222  Printf.sprintf "Watches with pending events: %d, pending events total: %d" (Connection.Watch.Set.cardinal s) pending
223
224let filter ~f cons =
225  let fold _ v acc = if f v then v :: acc else acc in
226  []
227  |> Hashtbl.fold fold cons.anonymous
228  |> Hashtbl.fold fold cons.domains
229
230let prevents_quit cons = filter ~f:Connection.prevents_live_update cons
231