1(*
2 * Copyright (C) 2006-2007 XenSource Ltd.
3 * Copyright (C) 2008      Citrix Ltd.
4 * Author Vincent Hanquez <vincent.hanquez@eu.citrix.com>
5 * Author Thomas Gazagnaire <thomas.gazagnaire@citrix.com>
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU Lesser General Public License as published
9 * by the Free Software Foundation; version 2.1 only. with the special
10 * exception on linking described in file LICENSE.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 * GNU Lesser General Public License for more details.
16 *)
17let error fmt = Logging.error "transaction" fmt
18
19open Stdext
20
21let none = 0
22let test_eagain = ref false
23let do_coalesce = ref true
24
25let check_parents_perms_identical root1 root2 path =
26	let hierarch = Store.Path.get_hierarchy path in
27	let permdiff = List.fold_left (fun acc path ->
28		let n1 = Store.Path.get_node root1 path
29		and n2 = Store.Path.get_node root2 path in
30		match n1, n2 with
31		| Some n1, Some n2 ->
32			not (Perms.equiv (Store.Node.get_perms n1) (Store.Node.get_perms n2)) || acc
33		| _ ->
34			true || acc
35	) false hierarch in
36	(not permdiff)
37
38let get_lowest path1 path2 =
39	match path2 with
40	| None       -> Some path1
41	| Some path2 -> Some (Store.Path.get_common_prefix path1 path2)
42
43let test_coalesce oldroot currentroot optpath =
44	match optpath with
45	| None      -> true
46	| Some path ->
47		let oldnode = Store.Path.get_node oldroot path
48		and currentnode = Store.Path.get_node currentroot path in
49
50		match oldnode, currentnode with
51		| (Some oldnode), (Some currentnode) ->
52			if oldnode == currentnode then (
53				check_parents_perms_identical oldroot currentroot path
54			) else (
55				false
56			)
57		| None, None -> (
58			(* ok then it doesn't exists in the old version and the current version,
59			   just sneak it in as a child of the parent node if it exists, or else fail *)
60			let pnode = Store.Path.get_node currentroot (Store.Path.get_parent path) in
61			match pnode with
62			| None       -> false (* ok it doesn't exists, just bail out. *)
63			| Some pnode -> true
64			)
65		| _ ->
66			false
67
68let can_coalesce oldroot currentroot path =
69	if !do_coalesce then
70		try test_coalesce oldroot currentroot path with _ -> false
71	else
72		false
73
74type ty = No | Full of (
75	int *          (* Transaction id *)
76	Store.t *      (* Original store *)
77	Store.t        (* A pointer to the canonical store: its root changes on each transaction-commit *)
78)
79
80type t = {
81	ty: ty;
82	start_count: int64;
83	store: Store.t; (* This is the store that we change in write operations. *)
84	quota: Quota.t;
85	mutable paths: (Xenbus.Xb.Op.operation * Store.Path.t) list;
86	mutable operations: (Packet.request * Packet.response) list;
87	mutable read_lowpath: Store.Path.t option;
88	mutable write_lowpath: Store.Path.t option;
89}
90let get_id t = match t.ty with No -> none | Full (id, _, _) -> id
91
92let counter = ref 0L
93let failed_commits = ref 0L
94let failed_commits_no_culprit = ref 0L
95let reset_conflict_stats () =
96	failed_commits := 0L;
97	failed_commits_no_culprit := 0L
98
99(* Scope for optimisation: different data-structure and functions to search/filter it *)
100let short_running_txns = ref []
101
102let oldest_short_running_transaction () =
103	let rec last = function
104		| [] -> None
105		| [x] -> Some x
106		| x :: xs -> last xs
107	in last !short_running_txns
108
109let trim_short_running_transactions txn =
110	let cutoff = Unix.gettimeofday () -. !Define.conflict_max_history_seconds in
111	let keep = match txn with
112		| None -> (function (start_time, _) -> start_time >= cutoff)
113		| Some t -> (function (start_time, tx) -> start_time >= cutoff && tx != t)
114	in
115	short_running_txns := List.filter
116		keep
117		!short_running_txns
118
119let make ?(internal=false) id store =
120	let ty = if id = none then No else Full(id, Store.copy store, store) in
121	let txn = {
122		ty = ty;
123		start_count = !counter;
124		store = if id = none then store else Store.copy store;
125		quota = Quota.copy store.Store.quota;
126		paths = [];
127		operations = [];
128		read_lowpath = None;
129		write_lowpath = None;
130	} in
131	if id <> none && not internal then (
132		let now = Unix.gettimeofday () in
133		short_running_txns := (now, txn) :: !short_running_txns
134	);
135	txn
136
137let get_store t = t.store
138let get_paths t = t.paths
139
140let is_read_only t = t.paths = []
141let add_wop t ty path = t.paths <- (ty, path) :: t.paths
142let add_operation ~perm t request response =
143	if !Define.maxrequests >= 0
144		&& not (Perms.Connection.is_dom0 perm)
145		&& List.length t.operations >= !Define.maxrequests
146		then raise Quota.Limit_reached;
147	t.operations <- (request, response) :: t.operations
148let get_operations t = List.rev t.operations
149let set_read_lowpath t path = t.read_lowpath <- get_lowest path t.read_lowpath
150let set_write_lowpath t path = t.write_lowpath <- get_lowest path t.write_lowpath
151
152let path_exists t path = Store.path_exists t.store path
153
154let write t perm path value =
155	let path_exists = path_exists t path in
156	Store.write t.store perm path value;
157	if path_exists
158	then set_write_lowpath t path
159	else set_write_lowpath t (Store.Path.get_parent path);
160	add_wop t Xenbus.Xb.Op.Write path
161
162let mkdir ?(with_watch=true) t perm path =
163	Store.mkdir t.store perm path;
164	set_write_lowpath t path;
165	if with_watch then
166		add_wop t Xenbus.Xb.Op.Mkdir path
167
168let setperms t perm path perms =
169	Store.setperms t.store perm path perms;
170	set_write_lowpath t path;
171	add_wop t Xenbus.Xb.Op.Setperms path
172
173let rm t perm path =
174	Store.rm t.store perm path;
175	set_write_lowpath t (Store.Path.get_parent path);
176	add_wop t Xenbus.Xb.Op.Rm path
177
178let ls t perm path =
179	let r = Store.ls t.store perm path in
180	set_read_lowpath t path;
181	r
182
183let read t perm path =
184	let r = Store.read t.store perm path in
185	set_read_lowpath t path;
186	r
187
188let getperms t perm path =
189	let r = Store.getperms t.store perm path in
190	set_read_lowpath t path;
191	r
192
193let commit ~con t =
194	let has_write_ops = List.length t.paths > 0 in
195	let has_coalesced = ref false in
196	let has_commited =
197	match t.ty with
198	| No                         -> true
199	| Full (id, oldstore, cstore) ->       (* "cstore" meaning current canonical store *)
200		let commit_partial oldroot cstore store =
201			(* get the lowest path of the query and verify that it hasn't
202			   been modified by others transactions. *)
203			if can_coalesce oldroot (Store.get_root cstore) t.read_lowpath
204			&& can_coalesce oldroot (Store.get_root cstore) t.write_lowpath then (
205				maybe (fun p ->
206					let n = Store.get_node store p in
207
208					(* it has to be in the store, otherwise it means bugs
209					   in the lowpath registration. we don't need to handle none. *)
210					maybe (fun n -> Store.set_node cstore p n t.quota store.Store.quota) n;
211					Logging.write_coalesce ~tid:(get_id t) ~con (Store.Path.to_string p);
212				) t.write_lowpath;
213				maybe (fun p ->
214					Logging.read_coalesce ~tid:(get_id t) ~con (Store.Path.to_string p)
215					) t.read_lowpath;
216				has_coalesced := true;
217				Store.incr_transaction_coalesce cstore;
218				true
219			) else (
220				(* cannot do anything simple, just discard the queries,
221				   and the client need to redo it later *)
222				Store.incr_transaction_abort cstore;
223				false
224			)
225			in
226		let try_commit oldroot cstore store =
227			if oldroot == Store.get_root cstore then (
228				(* move the new root to the current store, if the oldroot
229				   has not been modified *)
230				if has_write_ops then (
231					Store.set_root cstore (Store.get_root store);
232					Store.set_quota cstore (Store.get_quota store)
233				);
234				true
235			) else
236				(* we try a partial commit if possible *)
237				commit_partial oldroot cstore store
238			in
239		if !test_eagain && Random.int 3 = 0 then
240			false
241		else
242			try_commit (Store.get_root oldstore) cstore t.store
243		in
244	if has_commited && has_write_ops then
245		Disk.write t.store;
246	if not has_commited
247	then Logging.conflict ~tid:(get_id t) ~con
248	else if not !has_coalesced
249	then Logging.commit ~tid:(get_id t) ~con;
250	has_commited
251