1(* 2 * Copyright (C) 2006-2007 XenSource Ltd. 3 * Copyright (C) 2008 Citrix Ltd. 4 * Author Vincent Hanquez <vincent.hanquez@eu.citrix.com> 5 * Author Thomas Gazagnaire <thomas.gazagnaire@citrix.com> 6 * 7 * This program is free software; you can redistribute it and/or modify 8 * it under the terms of the GNU Lesser General Public License as published 9 * by the Free Software Foundation; version 2.1 only. with the special 10 * exception on linking described in file LICENSE. 11 * 12 * This program is distributed in the hope that it will be useful, 13 * but WITHOUT ANY WARRANTY; without even the implied warranty of 14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15 * GNU Lesser General Public License for more details. 16 *) 17let error fmt = Logging.error "transaction" fmt 18 19open Stdext 20 21let none = 0 22let test_eagain = ref false 23let do_coalesce = ref true 24 25let check_parents_perms_identical root1 root2 path = 26 let hierarch = Store.Path.get_hierarchy path in 27 let permdiff = List.fold_left (fun acc path -> 28 let n1 = Store.Path.get_node root1 path 29 and n2 = Store.Path.get_node root2 path in 30 match n1, n2 with 31 | Some n1, Some n2 -> 32 not (Perms.equiv (Store.Node.get_perms n1) (Store.Node.get_perms n2)) || acc 33 | _ -> 34 true || acc 35 ) false hierarch in 36 (not permdiff) 37 38let get_lowest path1 path2 = 39 match path2 with 40 | None -> Some path1 41 | Some path2 -> Some (Store.Path.get_common_prefix path1 path2) 42 43let test_coalesce oldroot currentroot optpath = 44 match optpath with 45 | None -> true 46 | Some path -> 47 let oldnode = Store.Path.get_node oldroot path 48 and currentnode = Store.Path.get_node currentroot path in 49 50 match oldnode, currentnode with 51 | (Some oldnode), (Some currentnode) -> 52 if oldnode == currentnode then ( 53 check_parents_perms_identical oldroot currentroot path 54 ) else ( 55 false 56 ) 57 | None, None -> ( 58 (* ok then it doesn't exists in the old version and the current version, 59 just sneak it in as a child of the parent node if it exists, or else fail *) 60 let pnode = Store.Path.get_node currentroot (Store.Path.get_parent path) in 61 match pnode with 62 | None -> false (* ok it doesn't exists, just bail out. *) 63 | Some pnode -> true 64 ) 65 | _ -> 66 false 67 68let can_coalesce oldroot currentroot path = 69 if !do_coalesce then 70 try test_coalesce oldroot currentroot path with _ -> false 71 else 72 false 73 74type ty = No | Full of ( 75 int * (* Transaction id *) 76 Store.t * (* Original store *) 77 Store.t (* A pointer to the canonical store: its root changes on each transaction-commit *) 78) 79 80type t = { 81 ty: ty; 82 start_count: int64; 83 store: Store.t; (* This is the store that we change in write operations. *) 84 quota: Quota.t; 85 mutable paths: (Xenbus.Xb.Op.operation * Store.Path.t) list; 86 mutable operations: (Packet.request * Packet.response) list; 87 mutable read_lowpath: Store.Path.t option; 88 mutable write_lowpath: Store.Path.t option; 89} 90let get_id t = match t.ty with No -> none | Full (id, _, _) -> id 91 92let counter = ref 0L 93let failed_commits = ref 0L 94let failed_commits_no_culprit = ref 0L 95let reset_conflict_stats () = 96 failed_commits := 0L; 97 failed_commits_no_culprit := 0L 98 99(* Scope for optimisation: different data-structure and functions to search/filter it *) 100let short_running_txns = ref [] 101 102let oldest_short_running_transaction () = 103 let rec last = function 104 | [] -> None 105 | [x] -> Some x 106 | x :: xs -> last xs 107 in last !short_running_txns 108 109let trim_short_running_transactions txn = 110 let cutoff = Unix.gettimeofday () -. !Define.conflict_max_history_seconds in 111 let keep = match txn with 112 | None -> (function (start_time, _) -> start_time >= cutoff) 113 | Some t -> (function (start_time, tx) -> start_time >= cutoff && tx != t) 114 in 115 short_running_txns := List.filter 116 keep 117 !short_running_txns 118 119let make ?(internal=false) id store = 120 let ty = if id = none then No else Full(id, Store.copy store, store) in 121 let txn = { 122 ty = ty; 123 start_count = !counter; 124 store = if id = none then store else Store.copy store; 125 quota = Quota.copy store.Store.quota; 126 paths = []; 127 operations = []; 128 read_lowpath = None; 129 write_lowpath = None; 130 } in 131 if id <> none && not internal then ( 132 let now = Unix.gettimeofday () in 133 short_running_txns := (now, txn) :: !short_running_txns 134 ); 135 txn 136 137let get_store t = t.store 138let get_paths t = t.paths 139 140let is_read_only t = t.paths = [] 141let add_wop t ty path = t.paths <- (ty, path) :: t.paths 142let add_operation ~perm t request response = 143 if !Define.maxrequests >= 0 144 && not (Perms.Connection.is_dom0 perm) 145 && List.length t.operations >= !Define.maxrequests 146 then raise Quota.Limit_reached; 147 t.operations <- (request, response) :: t.operations 148let get_operations t = List.rev t.operations 149let set_read_lowpath t path = t.read_lowpath <- get_lowest path t.read_lowpath 150let set_write_lowpath t path = t.write_lowpath <- get_lowest path t.write_lowpath 151 152let path_exists t path = Store.path_exists t.store path 153 154let write t perm path value = 155 let path_exists = path_exists t path in 156 Store.write t.store perm path value; 157 if path_exists 158 then set_write_lowpath t path 159 else set_write_lowpath t (Store.Path.get_parent path); 160 add_wop t Xenbus.Xb.Op.Write path 161 162let mkdir ?(with_watch=true) t perm path = 163 Store.mkdir t.store perm path; 164 set_write_lowpath t path; 165 if with_watch then 166 add_wop t Xenbus.Xb.Op.Mkdir path 167 168let setperms t perm path perms = 169 Store.setperms t.store perm path perms; 170 set_write_lowpath t path; 171 add_wop t Xenbus.Xb.Op.Setperms path 172 173let rm t perm path = 174 Store.rm t.store perm path; 175 set_write_lowpath t (Store.Path.get_parent path); 176 add_wop t Xenbus.Xb.Op.Rm path 177 178let ls t perm path = 179 let r = Store.ls t.store perm path in 180 set_read_lowpath t path; 181 r 182 183let read t perm path = 184 let r = Store.read t.store perm path in 185 set_read_lowpath t path; 186 r 187 188let getperms t perm path = 189 let r = Store.getperms t.store perm path in 190 set_read_lowpath t path; 191 r 192 193let commit ~con t = 194 let has_write_ops = List.length t.paths > 0 in 195 let has_coalesced = ref false in 196 let has_commited = 197 match t.ty with 198 | No -> true 199 | Full (id, oldstore, cstore) -> (* "cstore" meaning current canonical store *) 200 let commit_partial oldroot cstore store = 201 (* get the lowest path of the query and verify that it hasn't 202 been modified by others transactions. *) 203 if can_coalesce oldroot (Store.get_root cstore) t.read_lowpath 204 && can_coalesce oldroot (Store.get_root cstore) t.write_lowpath then ( 205 maybe (fun p -> 206 let n = Store.get_node store p in 207 208 (* it has to be in the store, otherwise it means bugs 209 in the lowpath registration. we don't need to handle none. *) 210 maybe (fun n -> Store.set_node cstore p n t.quota store.Store.quota) n; 211 Logging.write_coalesce ~tid:(get_id t) ~con (Store.Path.to_string p); 212 ) t.write_lowpath; 213 maybe (fun p -> 214 Logging.read_coalesce ~tid:(get_id t) ~con (Store.Path.to_string p) 215 ) t.read_lowpath; 216 has_coalesced := true; 217 Store.incr_transaction_coalesce cstore; 218 true 219 ) else ( 220 (* cannot do anything simple, just discard the queries, 221 and the client need to redo it later *) 222 Store.incr_transaction_abort cstore; 223 false 224 ) 225 in 226 let try_commit oldroot cstore store = 227 if oldroot == Store.get_root cstore then ( 228 (* move the new root to the current store, if the oldroot 229 has not been modified *) 230 if has_write_ops then ( 231 Store.set_root cstore (Store.get_root store); 232 Store.set_quota cstore (Store.get_quota store) 233 ); 234 true 235 ) else 236 (* we try a partial commit if possible *) 237 commit_partial oldroot cstore store 238 in 239 if !test_eagain && Random.int 3 = 0 then 240 false 241 else 242 try_commit (Store.get_root oldstore) cstore t.store 243 in 244 if has_commited && has_write_ops then 245 Disk.write t.store; 246 if not has_commited 247 then Logging.conflict ~tid:(get_id t) ~con 248 else if not !has_coalesced 249 then Logging.commit ~tid:(get_id t) ~con; 250 has_commited 251