1(* 2 * Copyright (C) 2006-2007 XenSource Ltd. 3 * Copyright (C) 2008 Citrix Ltd. 4 * Author Vincent Hanquez <vincent.hanquez@eu.citrix.com> 5 * Author Thomas Gazagnaire <thomas.gazagnaire@eu.citrix.com> 6 * 7 * This program is free software; you can redistribute it and/or modify 8 * it under the terms of the GNU Lesser General Public License as published 9 * by the Free Software Foundation; version 2.1 only. with the special 10 * exception on linking described in file LICENSE. 11 * 12 * This program is distributed in the hope that it will be useful, 13 * but WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15 * GNU Lesser General Public License for more details. 16 *) 17 18let debug fmt = Logging.debug "connections" fmt 19 20type t = { 21 anonymous: (Unix.file_descr, Connection.t) Hashtbl.t; 22 domains: (int, Connection.t) Hashtbl.t; 23 ports: (Xeneventchn.t, Connection.t) Hashtbl.t; 24 mutable watches: Connection.watch list Trie.t; 25 mutable has_pending_watchevents: Connection.Watch.Set.t 26} 27 28let create () = { 29 anonymous = Hashtbl.create 37; 30 domains = Hashtbl.create 37; 31 ports = Hashtbl.create 37; 32 watches = Trie.create (); 33 has_pending_watchevents = Connection.Watch.Set.empty; 34} 35 36let get_capacity () = 37 (* not multiplied by maxwatch on purpose: 2nd queue in watch itself! *) 38 { Xenbus.Xb.maxoutstanding = !Define.maxoutstanding; maxwatchevents = !Define.maxwatchevents } 39 40let add_anonymous cons fd = 41 let capacity = get_capacity () in 42 let xbcon = Xenbus.Xb.open_fd fd ~capacity in 43 let con = Connection.create xbcon None in 44 Hashtbl.add cons.anonymous (Xenbus.Xb.get_fd xbcon) con 45 46let add_domain cons dom = 47 let capacity = get_capacity () in 48 let xbcon = Xenbus.Xb.open_mmap ~capacity (Domain.get_interface dom) (fun () -> Domain.notify dom) in 49 let con = Connection.create xbcon (Some dom) in 50 Hashtbl.add cons.domains (Domain.get_id dom) con; 51 Hashtbl.add cons.ports (Domain.get_local_port dom) con 52 53let select ?(only_if = (fun _ -> true)) cons = 54 Hashtbl.fold (fun _ con (ins, outs) -> 55 if (only_if con) then ( 56 let fd = Connection.get_fd con in 57 let in_fds = if Connection.can_input con then fd :: ins else ins in 58 let out_fds = if Connection.has_output con then fd :: outs else outs in 59 in_fds, out_fds 60 ) else (ins, outs) 61 ) 62 cons.anonymous ([], []) 63 64let find cons = 65 Hashtbl.find cons.anonymous 66 67let find_domain cons = 68 Hashtbl.find cons.domains 69 70let find_domain_by_port cons port = 71 Hashtbl.find cons.ports port 72 73let del_watches_of_con con watches = 74 match List.filter (fun w -> Connection.get_con w != con) watches with 75 | [] -> None 76 | ws -> Some ws 77 78let del_watches cons con = 79 Connection.del_watches con; 80 cons.watches <- Trie.map (del_watches_of_con con) cons.watches; 81 cons.has_pending_watchevents <- 82 cons.has_pending_watchevents |> Connection.Watch.Set.filter @@ fun w -> 83 Connection.get_con w != con 84 85let del_anonymous cons con = 86 try 87 Hashtbl.remove cons.anonymous (Connection.get_fd con); 88 del_watches cons con; 89 Connection.close con 90 with exn -> 91 debug "del anonymous %s" (Printexc.to_string exn) 92 93let del_domain cons id = 94 try 95 let con = find_domain cons id in 96 Hashtbl.remove cons.domains id; 97 (match Connection.get_domain con with 98 | Some d -> Hashtbl.remove cons.ports (Domain.get_local_port d) 99 | None -> ()); 100 del_watches cons con; 101 Connection.close con 102 with exn -> 103 debug "del domain %u: %s" id (Printexc.to_string exn) 104 105let iter_domains cons fct = 106 Hashtbl.iter (fun _ c -> fct c) cons.domains 107 108let iter_anonymous cons fct = 109 Hashtbl.iter (fun _ c -> fct c) cons.anonymous 110 111let iter cons fct = 112 iter_domains cons fct; iter_anonymous cons fct 113 114let has_more_work cons = 115 Hashtbl.fold 116 (fun _id con acc -> 117 if Connection.has_more_work con then con :: acc else acc) 118 cons.domains [] 119 120let key_of_str path = 121 if path.[0] = '@' 122 then [path] 123 else "" :: Store.Path.to_string_list (Store.Path.of_string path) 124 125let key_of_path path = 126 "" :: Store.Path.to_string_list path 127 128let add_watch cons con path token = 129 let apath = Connection.get_watch_path con path in 130 (* fail on invalid paths early by calling key_of_str before adding watch *) 131 let key = key_of_str apath in 132 let watch = Connection.add_watch con (path, apath) token in 133 let watches = 134 if Trie.mem cons.watches key 135 then Trie.find cons.watches key 136 else [] 137 in 138 cons.watches <- Trie.set cons.watches key (watch :: watches); 139 watch 140 141let del_watch cons con path token = 142 let apath, watch = Connection.del_watch con path token in 143 let key = key_of_str apath in 144 let watches = Utils.list_remove watch (Trie.find cons.watches key) in 145 if watches = [] then 146 cons.watches <- Trie.unset cons.watches key 147 else 148 cons.watches <- Trie.set cons.watches key watches; 149 watch 150 151(* path is absolute *) 152let fire_watches ?oldroot source root cons path recurse = 153 let key = key_of_path path in 154 let path = Store.Path.to_string path in 155 let roots = oldroot, root in 156 let fire_watch _ = function 157 | None -> () 158 | Some watches -> List.iter (fun w -> Connection.fire_watch source roots w path) watches 159 in 160 let fire_rec _x = function 161 | None -> () 162 | Some watches -> 163 List.iter (Connection.fire_single_watch source roots) watches 164 in 165 Trie.iter_path fire_watch cons.watches key; 166 if recurse then 167 Trie.iter fire_rec (Trie.sub cons.watches key) 168 169let send_watchevents cons con = 170 cons.has_pending_watchevents <- 171 cons.has_pending_watchevents |> Connection.Watch.Set.filter Connection.Watch.flush_events; 172 Connection.source_flush_watchevents con 173 174let fire_spec_watches root cons specpath = 175 let source = find_domain cons 0 in 176 iter cons (fun con -> 177 List.iter (Connection.fire_single_watch source (None, root)) (Connection.get_watches con specpath)) 178 179let set_target cons domain target_domain = 180 let con = find_domain cons domain in 181 Connection.set_target con target_domain 182 183let number_of_transactions cons = 184 let res = ref 0 in 185 let aux con = 186 res := Connection.number_of_transactions con + !res 187 in 188 iter cons aux; 189 !res 190 191let stats cons = 192 let nb_ops_anon = ref 0 193 and nb_watchs_anon = ref 0 194 and nb_ops_dom = ref 0 195 and nb_watchs_dom = ref 0 in 196 iter_anonymous cons (fun con -> 197 let con_watchs, con_ops = Connection.stats con in 198 nb_ops_anon := !nb_ops_anon + con_ops; 199 nb_watchs_anon := !nb_watchs_anon + con_watchs; 200 ); 201 iter_domains cons (fun con -> 202 let con_watchs, con_ops = Connection.stats con in 203 nb_ops_dom := !nb_ops_dom + con_ops; 204 nb_watchs_dom := !nb_watchs_dom + con_watchs; 205 ); 206 (Hashtbl.length cons.anonymous, !nb_ops_anon, !nb_watchs_anon, 207 Hashtbl.length cons.domains, !nb_ops_dom, !nb_watchs_dom) 208 209let debug cons = 210 let anonymous = Hashtbl.fold (fun _ con accu -> Connection.debug con :: accu) cons.anonymous [] in 211 let domains = Hashtbl.fold (fun _ con accu -> Connection.debug con :: accu) cons.domains [] in 212 String.concat "" (domains @ anonymous) 213 214let debug_watchevents cons con = 215 (* == (physical equality) 216 has to be used here because w.con.xb.backend might contain a [unit->unit] value causing regular 217 comparison to fail due to having a 'functional value' which cannot be compared. 218 *) 219 let s = cons.has_pending_watchevents |> Connection.Watch.Set.filter (fun w -> w.con == con) in 220 let pending = s |> Connection.Watch.Set.elements 221 |> List.map (fun w -> Connection.Watch.pending_watchevents w) |> List.fold_left (+) 0 in 222 Printf.sprintf "Watches with pending events: %d, pending events total: %d" (Connection.Watch.Set.cardinal s) pending 223 224let filter ~f cons = 225 let fold _ v acc = if f v then v :: acc else acc in 226 [] 227 |> Hashtbl.fold fold cons.anonymous 228 |> Hashtbl.fold fold cons.domains 229 230let prevents_quit cons = filter ~f:Connection.prevents_live_update cons 231