1(* 2 * Copyright (C) 2006-2007 XenSource Ltd. 3 * Copyright (C) 2008 Citrix Ltd. 4 * Author Vincent Hanquez <vincent.hanquez@eu.citrix.com> 5 * 6 * This program is free software; you can redistribute it and/or modify 7 * it under the terms of the GNU Lesser General Public License as published 8 * by the Free Software Foundation; version 2.1 only. with the special 9 * exception on linking described in file LICENSE. 10 * 11 * This program is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * GNU Lesser General Public License for more details. 15 *) 16 17let error fmt = Logging.error "process" fmt 18let info fmt = Logging.info "process" fmt 19let debug fmt = Logging.debug "process" fmt 20 21open Printf 22open Stdext 23 24exception Transaction_again 25exception Transaction_nested 26exception Domain_not_match 27exception Invalid_Cmd_Args 28 29(* This controls the do_debug fn in this module, not the debug logging-function. *) 30let allow_debug = ref false 31 32let c_int_of_string s = 33 let v = ref 0 in 34 let is_digit c = c >= '0' && c <= '9' in 35 let len = String.length s in 36 let i = ref 0 in 37 while !i < len && not (is_digit s.[!i]) do incr i done; 38 while !i < len && is_digit s.[!i] 39 do 40 let x = (Char.code s.[!i]) - (Char.code '0') in 41 v := !v * 10 + x; 42 incr i 43 done; 44 !v 45 46(* when we don't want a limit, apply a max limit of 8 arguments. 47 no arguments take more than 3 currently, which is pointless to split 48 more than needed. *) 49let split limit c s = 50 let limit = match limit with None -> 8 | Some x -> x in 51 String.split ~limit c s 52 53let split_one_path data con = 54 let args = split (Some 2) '\000' data in 55 match args with 56 | path :: "" :: [] -> Store.Path.create path (Connection.get_path con) 57 | _ -> raise Invalid_Cmd_Args 58 59let process_watch ops cons = 60 let do_op_watch op cons = 61 let recurse = match (fst op) with 62 | Xenbus.Xb.Op.Write -> false 63 | Xenbus.Xb.Op.Mkdir -> false 64 | Xenbus.Xb.Op.Rm -> true 65 | Xenbus.Xb.Op.Setperms -> false 66 | _ -> raise (Failure "huh ?") in 67 Connections.fire_watches cons (snd op) recurse in 68 List.iter (fun op -> do_op_watch op cons) ops 69 70let create_implicit_path t perm path = 71 let dirname = Store.Path.get_parent path in 72 if not (Transaction.path_exists t dirname) then ( 73 let rec check_path p = 74 match p with 75 | [] -> [] 76 | h :: l -> 77 if Transaction.path_exists t h then 78 check_path l 79 else 80 p in 81 let ret = check_path (List.tl (Store.Path.get_hierarchy dirname)) in 82 List.iter (fun s -> Transaction.mkdir ~with_watch:false t perm s) ret 83 ) 84 85(* packets *) 86let do_debug con t domains cons data = 87 if not (Connection.is_dom0 con) && not !allow_debug 88 then None 89 else try match split None '\000' data with 90 | "print" :: msg :: _ -> 91 Logging.xb_op ~tid:0 ~ty:Xenbus.Xb.Op.Debug ~con:"=======>" msg; 92 None 93 | "quota" :: domid :: _ -> 94 let domid = int_of_string domid in 95 let quota = (Store.get_quota t.Transaction.store) in 96 Some (Quota.to_string quota domid ^ "\000") 97 | "watches" :: _ -> 98 let watches = Connections.debug cons in 99 Some (watches ^ "\000") 100 | "mfn" :: domid :: _ -> 101 let domid = int_of_string domid in 102 let con = Connections.find_domain cons domid in 103 may (fun dom -> Printf.sprintf "%nd\000" (Domain.get_mfn dom)) (Connection.get_domain con) 104 | _ -> None 105 with _ -> None 106 107let do_directory con t domains cons data = 108 let path = split_one_path data con in 109 let entries = Transaction.ls t (Connection.get_perm con) path in 110 if List.length entries > 0 then 111 (Utils.join_by_null entries) ^ "\000" 112 else 113 "" 114 115let do_read con t domains cons data = 116 let path = split_one_path data con in 117 Transaction.read t (Connection.get_perm con) path 118 119let do_getperms con t domains cons data = 120 let path = split_one_path data con in 121 let perms = Transaction.getperms t (Connection.get_perm con) path in 122 Perms.Node.to_string perms ^ "\000" 123 124let do_getdomainpath con t domains cons data = 125 let domid = 126 match (split None '\000' data) with 127 | domid :: "" :: [] -> c_int_of_string domid 128 | _ -> raise Invalid_Cmd_Args 129 in 130 sprintf "/local/domain/%u\000" domid 131 132let do_write con t domains cons data = 133 let path, value = 134 match (split (Some 2) '\000' data) with 135 | path :: value :: [] -> Store.Path.create path (Connection.get_path con), value 136 | _ -> raise Invalid_Cmd_Args 137 in 138 create_implicit_path t (Connection.get_perm con) path; 139 Transaction.write t (Connection.get_perm con) path value 140 141let do_mkdir con t domains cons data = 142 let path = split_one_path data con in 143 create_implicit_path t (Connection.get_perm con) path; 144 try 145 Transaction.mkdir t (Connection.get_perm con) path 146 with 147 Define.Already_exist -> () 148 149let do_rm con t domains cons data = 150 let path = split_one_path data con in 151 try 152 Transaction.rm t (Connection.get_perm con) path 153 with 154 Define.Doesnt_exist -> () 155 156let do_setperms con t domains cons data = 157 let path, perms = 158 match (split (Some 2) '\000' data) with 159 | path :: perms :: _ -> 160 Store.Path.create path (Connection.get_path con), 161 (Perms.Node.of_string perms) 162 | _ -> raise Invalid_Cmd_Args 163 in 164 Transaction.setperms t (Connection.get_perm con) path perms 165 166let do_error con t domains cons data = 167 raise Define.Unknown_operation 168 169let do_isintroduced con t domains cons data = 170 let domid = 171 match (split None '\000' data) with 172 | domid :: _ -> int_of_string domid 173 | _ -> raise Invalid_Cmd_Args 174 in 175 if domid = Define.domid_self || Domains.exist domains domid then "T\000" else "F\000" 176 177(* only in xen >= 4.2 *) 178let do_reset_watches con t domains cons data = 179 Connection.del_watches con; 180 Connection.del_transactions con 181 182(* only in >= xen3.3 *) 183let do_set_target con t domains cons data = 184 if not (Connection.is_dom0 con) 185 then raise Define.Permission_denied; 186 match split None '\000' data with 187 | [ domid; target_domid; "" ] -> Connections.set_target cons (c_int_of_string domid) (c_int_of_string target_domid) 188 | _ -> raise Invalid_Cmd_Args 189 190(*------------- Generic handling of ty ------------------*) 191let send_response ty con t rid response = 192 match response with 193 | Packet.Ack f -> 194 Connection.send_ack con (Transaction.get_id t) rid ty; 195 (* Now do any necessary follow-up actions *) 196 f () 197 | Packet.Reply ret -> 198 Connection.send_reply con (Transaction.get_id t) rid ty ret 199 | Packet.Error e -> 200 Connection.send_error con (Transaction.get_id t) rid e 201 202let reply_ack fct con t doms cons data = 203 fct con t doms cons data; 204 Packet.Ack (fun () -> 205 if Transaction.get_id t = Transaction.none then 206 process_watch (Transaction.get_paths t) cons 207 ) 208 209let reply_data fct con t doms cons data = 210 let ret = fct con t doms cons data in 211 Packet.Reply ret 212 213let reply_data_or_ack fct con t doms cons data = 214 match fct con t doms cons data with 215 | Some ret -> Packet.Reply ret 216 | None -> Packet.Ack (fun () -> ()) 217 218let reply_none fct con t doms cons data = 219 (* let the function reply *) 220 fct con t doms cons data 221 222(* Functions for 'simple' operations that cannot be part of a transaction *) 223let function_of_type_simple_op ty = 224 match ty with 225 | Xenbus.Xb.Op.Debug 226 | Xenbus.Xb.Op.Watch 227 | Xenbus.Xb.Op.Unwatch 228 | Xenbus.Xb.Op.Transaction_start 229 | Xenbus.Xb.Op.Transaction_end 230 | Xenbus.Xb.Op.Introduce 231 | Xenbus.Xb.Op.Release 232 | Xenbus.Xb.Op.Isintroduced 233 | Xenbus.Xb.Op.Resume 234 | Xenbus.Xb.Op.Set_target 235 | Xenbus.Xb.Op.Reset_watches 236 | Xenbus.Xb.Op.Invalid -> error "called function_of_type_simple_op on operation %s" (Xenbus.Xb.Op.to_string ty); 237 raise (Invalid_argument (Xenbus.Xb.Op.to_string ty)) 238 | Xenbus.Xb.Op.Directory -> reply_data do_directory 239 | Xenbus.Xb.Op.Read -> reply_data do_read 240 | Xenbus.Xb.Op.Getperms -> reply_data do_getperms 241 | Xenbus.Xb.Op.Getdomainpath -> reply_data do_getdomainpath 242 | Xenbus.Xb.Op.Write -> reply_ack do_write 243 | Xenbus.Xb.Op.Mkdir -> reply_ack do_mkdir 244 | Xenbus.Xb.Op.Rm -> reply_ack do_rm 245 | Xenbus.Xb.Op.Setperms -> reply_ack do_setperms 246 | _ -> reply_ack do_error 247 248let input_handle_error ~cons ~doms ~fct ~con ~t ~req = 249 let reply_error e = 250 Packet.Error e in 251 try 252 fct con t doms cons req.Packet.data 253 with 254 | Define.Invalid_path -> reply_error "EINVAL" 255 | Define.Already_exist -> reply_error "EEXIST" 256 | Define.Doesnt_exist -> reply_error "ENOENT" 257 | Define.Lookup_Doesnt_exist s -> reply_error "ENOENT" 258 | Define.Permission_denied -> reply_error "EACCES" 259 | Not_found -> reply_error "ENOENT" 260 | Invalid_Cmd_Args -> reply_error "EINVAL" 261 | Invalid_argument i -> reply_error "EINVAL" 262 | Transaction_again -> reply_error "EAGAIN" 263 | Transaction_nested -> reply_error "EBUSY" 264 | Domain_not_match -> reply_error "EINVAL" 265 | Quota.Limit_reached -> reply_error "EQUOTA" 266 | Quota.Data_too_big -> reply_error "E2BIG" 267 | Quota.Transaction_opened -> reply_error "EQUOTA" 268 | (Failure "int_of_string") -> reply_error "EINVAL" 269 | Define.Unknown_operation -> reply_error "ENOSYS" 270 271let write_access_log ~ty ~tid ~con ~data = 272 Logging.xb_op ~ty ~tid ~con data 273 274let write_answer_log ~ty ~tid ~con ~data = 275 Logging.xb_answer ~ty ~tid ~con data 276 277let write_response_log ~ty ~tid ~con ~response = 278 match response with 279 | Packet.Ack _ -> write_answer_log ~ty ~tid ~con ~data:"" 280 | Packet.Reply x -> write_answer_log ~ty ~tid ~con ~data:x 281 | Packet.Error e -> write_answer_log ~ty:(Xenbus.Xb.Op.Error) ~tid ~con ~data:e 282 283let record_commit ~con ~tid ~before ~after = 284 let inc r = r := Int64.add 1L !r in 285 let finish_count = inc Transaction.counter; !Transaction.counter in 286 History.push {History.con=con; tid=tid; before=before; after=after; finish_count=finish_count} 287 288(* Replay a stored transaction against a fresh store, check the responses are 289 all equivalent: if so, commit the transaction. Otherwise send the abort to 290 the client. *) 291let transaction_replay c t doms cons = 292 match t.Transaction.ty with 293 | Transaction.No -> 294 error "attempted to replay a non-full transaction"; 295 false 296 | Transaction.Full(id, oldstore, cstore) -> 297 let tid = Connection.start_transaction c cstore in 298 let replay_t = Transaction.make ~internal:true tid cstore in 299 let con = sprintf "r(%d):%s" id (Connection.get_domstr c) in 300 301 let perform_exn ~wlog txn (request, response) = 302 if wlog then write_access_log ~ty:request.Packet.ty ~tid ~con ~data:request.Packet.data; 303 let fct = function_of_type_simple_op request.Packet.ty in 304 let response' = input_handle_error ~cons ~doms ~fct ~con:c ~t:txn ~req:request in 305 if wlog then write_response_log ~ty:request.Packet.ty ~tid ~con ~response:response'; 306 if not(Packet.response_equal response response') then raise Transaction_again 307 in 308 finally 309 (fun () -> 310 try 311 Logging.start_transaction ~con ~tid; 312 List.iter (perform_exn ~wlog:true replay_t) (Transaction.get_operations t); (* May throw EAGAIN *) 313 314 Logging.end_transaction ~con ~tid; 315 Transaction.commit ~con replay_t 316 with 317 | Transaction_again -> ( 318 Transaction.failed_commits := Int64.add !Transaction.failed_commits 1L; 319 let victim_domstr = Connection.get_domstr c in 320 debug "Apportioning blame for EAGAIN in txn %d, domain=%s" id victim_domstr; 321 let punish guilty_con = 322 debug "Blaming domain %s for conflict with domain %s txn %d" 323 (Connection.get_domstr guilty_con) victim_domstr id; 324 Connection.decr_conflict_credit doms guilty_con 325 in 326 let judge_and_sentence hist_rec = ( 327 let can_apply_on store = ( 328 let store = Store.copy store in 329 let trial_t = Transaction.make ~internal:true Transaction.none store in 330 try List.iter (perform_exn ~wlog:false trial_t) (Transaction.get_operations t); 331 true 332 with Transaction_again -> false 333 ) in 334 if can_apply_on hist_rec.History.before 335 && not (can_apply_on hist_rec.History.after) 336 then (punish hist_rec.History.con; true) 337 else false 338 ) in 339 let guilty_cons = History.filter_connections ~ignore:c ~since:t.Transaction.start_count ~f:judge_and_sentence in 340 if Hashtbl.length guilty_cons = 0 then ( 341 debug "Found no culprit for conflict in %s: must be self or not in history." con; 342 Transaction.failed_commits_no_culprit := Int64.add !Transaction.failed_commits_no_culprit 1L 343 ); 344 false 345 ) 346 | e -> 347 info "transaction_replay %d caught: %s" tid (Printexc.to_string e); 348 false 349 ) 350 (fun () -> 351 Connection.end_transaction c tid None 352 ) 353 354let do_watch con t domains cons data = 355 let (node, token) = 356 match (split None '\000' data) with 357 | [node; token; ""] -> node, token 358 | _ -> raise Invalid_Cmd_Args 359 in 360 let watch = Connections.add_watch cons con node token in 361 Packet.Ack (fun () -> Connection.fire_single_watch watch) 362 363let do_unwatch con t domains cons data = 364 let (node, token) = 365 match (split None '\000' data) with 366 | [node; token; ""] -> node, token 367 | _ -> raise Invalid_Cmd_Args 368 in 369 Connections.del_watch cons con node token 370 371let do_transaction_start con t domains cons data = 372 if Transaction.get_id t <> Transaction.none then 373 raise Transaction_nested; 374 let store = Transaction.get_store t in 375 string_of_int (Connection.start_transaction con store) ^ "\000" 376 377let do_transaction_end con t domains cons data = 378 let commit = 379 match (split None '\000' data) with 380 | "T" :: _ -> true 381 | "F" :: _ -> false 382 | x :: _ -> raise (Invalid_argument x) 383 | _ -> raise Invalid_Cmd_Args 384 in 385 let commit = commit && not (Transaction.is_read_only t) in 386 let success = 387 let commit = if commit then Some (fun con trans -> transaction_replay con trans domains cons) else None in 388 History.end_transaction t con (Transaction.get_id t) commit in 389 if not success then 390 raise Transaction_again; 391 if commit then begin 392 process_watch (List.rev (Transaction.get_paths t)) cons; 393 match t.Transaction.ty with 394 | Transaction.No -> 395 () (* no need to record anything *) 396 | Transaction.Full(id, oldstore, cstore) -> 397 record_commit ~con ~tid:id ~before:oldstore ~after:cstore 398 end 399 400let do_introduce con t domains cons data = 401 if not (Connection.is_dom0 con) 402 then raise Define.Permission_denied; 403 let (domid, mfn, port) = 404 match (split None '\000' data) with 405 | domid :: mfn :: port :: _ -> 406 int_of_string domid, Nativeint.of_string mfn, int_of_string port 407 | _ -> raise Invalid_Cmd_Args; 408 in 409 let dom = 410 if Domains.exist domains domid then 411 Domains.find domains domid 412 else try 413 let ndom = Xenctrl.with_intf (fun xc -> 414 Domains.create xc domains domid mfn port) in 415 Connections.add_domain cons ndom; 416 Connections.fire_spec_watches cons "@introduceDomain"; 417 ndom 418 with _ -> raise Invalid_Cmd_Args 419 in 420 if (Domain.get_remote_port dom) <> port || (Domain.get_mfn dom) <> mfn then 421 raise Domain_not_match 422 423let do_release con t domains cons data = 424 if not (Connection.is_dom0 con) 425 then raise Define.Permission_denied; 426 let domid = 427 match (split None '\000' data) with 428 | [domid;""] -> int_of_string domid 429 | _ -> raise Invalid_Cmd_Args 430 in 431 let fire_spec_watches = Domains.exist domains domid in 432 Domains.del domains domid; 433 Connections.del_domain cons domid; 434 if fire_spec_watches 435 then Connections.fire_spec_watches cons "@releaseDomain" 436 else raise Invalid_Cmd_Args 437 438let do_resume con t domains cons data = 439 if not (Connection.is_dom0 con) 440 then raise Define.Permission_denied; 441 let domid = 442 match (split None '\000' data) with 443 | domid :: _ -> int_of_string domid 444 | _ -> raise Invalid_Cmd_Args 445 in 446 if Domains.exist domains domid 447 then Domains.resume domains domid 448 else raise Invalid_Cmd_Args 449 450let function_of_type ty = 451 match ty with 452 | Xenbus.Xb.Op.Debug -> reply_data_or_ack do_debug 453 | Xenbus.Xb.Op.Watch -> reply_none do_watch 454 | Xenbus.Xb.Op.Unwatch -> reply_ack do_unwatch 455 | Xenbus.Xb.Op.Transaction_start -> reply_data do_transaction_start 456 | Xenbus.Xb.Op.Transaction_end -> reply_ack do_transaction_end 457 | Xenbus.Xb.Op.Introduce -> reply_ack do_introduce 458 | Xenbus.Xb.Op.Release -> reply_ack do_release 459 | Xenbus.Xb.Op.Isintroduced -> reply_data do_isintroduced 460 | Xenbus.Xb.Op.Resume -> reply_ack do_resume 461 | Xenbus.Xb.Op.Set_target -> reply_ack do_set_target 462 | Xenbus.Xb.Op.Reset_watches -> reply_ack do_reset_watches 463 | Xenbus.Xb.Op.Invalid -> reply_ack do_error 464 | _ -> function_of_type_simple_op ty 465 466(** 467 * Determines which individual (non-transactional) operations we want to retain. 468 * We only want to retain operations that have side-effects in the store since 469 * these can be the cause of transactions failing. 470 *) 471let retain_op_in_history ty = 472 match ty with 473 | Xenbus.Xb.Op.Write 474 | Xenbus.Xb.Op.Mkdir 475 | Xenbus.Xb.Op.Rm 476 | Xenbus.Xb.Op.Setperms -> true 477 | Xenbus.Xb.Op.Debug 478 | Xenbus.Xb.Op.Directory 479 | Xenbus.Xb.Op.Read 480 | Xenbus.Xb.Op.Getperms 481 | Xenbus.Xb.Op.Watch 482 | Xenbus.Xb.Op.Unwatch 483 | Xenbus.Xb.Op.Transaction_start 484 | Xenbus.Xb.Op.Transaction_end 485 | Xenbus.Xb.Op.Introduce 486 | Xenbus.Xb.Op.Release 487 | Xenbus.Xb.Op.Getdomainpath 488 | Xenbus.Xb.Op.Watchevent 489 | Xenbus.Xb.Op.Error 490 | Xenbus.Xb.Op.Isintroduced 491 | Xenbus.Xb.Op.Resume 492 | Xenbus.Xb.Op.Set_target 493 | Xenbus.Xb.Op.Reset_watches 494 | Xenbus.Xb.Op.Invalid -> false 495 496(** 497 * Nothrow guarantee. 498 *) 499let process_packet ~store ~cons ~doms ~con ~req = 500 let ty = req.Packet.ty in 501 let tid = req.Packet.tid in 502 let rid = req.Packet.rid in 503 try 504 let fct = function_of_type ty in 505 let t = 506 if tid = Transaction.none then 507 Transaction.make tid store 508 else 509 Connection.get_transaction con tid 510 in 511 512 let execute () = input_handle_error ~cons ~doms ~fct ~con ~t ~req in 513 514 let response = 515 (* Note that transactions are recorded in history separately. *) 516 if tid = Transaction.none && retain_op_in_history ty then begin 517 let before = Store.copy store in 518 let response = execute () in 519 let after = Store.copy store in 520 record_commit ~con ~tid ~before ~after; 521 response 522 end else execute () 523 in 524 525 let response = try 526 if tid <> Transaction.none then 527 (* Remember the request and response for this operation in case we need to replay the transaction *) 528 Transaction.add_operation ~perm:(Connection.get_perm con) t req response; 529 response 530 with Quota.Limit_reached -> 531 Packet.Error "EQUOTA" 532 in 533 534 (* Put the response on the wire *) 535 send_response ty con t rid response 536 with exn -> 537 error "process packet: %s" (Printexc.to_string exn); 538 Connection.send_error con tid rid "EIO" 539 540let do_input store cons doms con = 541 let newpacket = 542 try 543 Connection.do_input con 544 with Xenbus.Xb.Reconnect -> 545 info "%s requests a reconnect" (Connection.get_domstr con); 546 Connection.reconnect con; 547 info "%s reconnection complete" (Connection.get_domstr con); 548 false 549 | Failure exp -> 550 error "caught exception %s" exp; 551 error "got a bad client %s" (sprintf "%-8s" (Connection.get_domstr con)); 552 Connection.mark_as_bad con; 553 false 554 in 555 556 if newpacket then ( 557 let packet = Connection.pop_in con in 558 let tid, rid, ty, data = Xenbus.Xb.Packet.unpack packet in 559 let req = {Packet.tid=tid; Packet.rid=rid; Packet.ty=ty; Packet.data=data} in 560 561 (* As we don't log IO, do not call an unnecessary sanitize_data 562 info "[%s] -> [%d] %s \"%s\"" 563 (Connection.get_domstr con) tid 564 (Xenbus.Xb.Op.to_string ty) (sanitize_data data); *) 565 process_packet ~store ~cons ~doms ~con ~req; 566 write_access_log ~ty ~tid ~con:(Connection.get_domstr con) ~data; 567 Connection.incr_ops con; 568 ) 569 570let do_output store cons doms con = 571 if Connection.has_output con then ( 572 if Connection.has_new_output con then ( 573 let packet = Connection.peek_output con in 574 let tid, rid, ty, data = Xenbus.Xb.Packet.unpack packet in 575 (* As we don't log IO, do not call an unnecessary sanitize_data 576 info "[%s] <- %s \"%s\"" 577 (Connection.get_domstr con) 578 (Xenbus.Xb.Op.to_string ty) (sanitize_data data);*) 579 write_answer_log ~ty ~tid ~con:(Connection.get_domstr con) ~data; 580 ); 581 try 582 ignore (Connection.do_output con) 583 with Xenbus.Xb.Reconnect -> 584 info "%s requests a reconnect" (Connection.get_domstr con); 585 Connection.reconnect con; 586 info "%s reconnection complete" (Connection.get_domstr con) 587 ) 588 589