1#!/usr/bin/python 2# 3# Copyright (C) 2011 Citrix Systems, Inc. 4# 5# This library is free software; you can redistribute it and/or modify 6# it under the terms of version 2.1 of the GNU Lesser General Public 7# License as published by the Free Software Foundation. 8# 9# This library is distributed in the hope that it will be useful, but 10# WITHOUT ANY WARRANTY; without even the implied warranty of 11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12# Lesser General Public License for more details. 13# 14# You should have received a copy of the GNU Lesser General Public 15# License along with this library; If not, see <http://www.gnu.org/licenses/>. 16# 17 18"""Overview: 19 20 - Gather Xen I/O ring states 21 (from %s/*/ring) 22 23 - Update ring states every -T seconds. 24 25 - Determine if rings are idle or make progress. 26 27 - Determine if idle rings dropped notifications (%s). 28 29 - Instruct stuck backends to reissue notifications. 30""" 31 32import os 33import glob 34 35class Pattern(object): 36 """A regex pattern. Compiled on demand, then persisted.""" 37 38 def __init__(self, regex): 39 self.regex = regex 40 self.__pattern = None 41 42 def get(self): 43 import re 44 45 if not self.__pattern: 46 self.__pattern = re.compile(self.regex) 47 48 return self.__pattern 49 50 def search(self, s): 51 return self.get().search(s) 52 53class XenBackend(object): 54 """A Xen I/O backend.""" 55 56 SYSFS_BASEDIR = "/sys/devices/xen-backend" 57 58 def __init__(self, rd, devid): 59 self.rd = int(rd) 60 self.devid = int(devid) 61 62 def __repr__(self): 63 return "%s(%d, %d)" % (type(self).__name__, 64 self.rd, self.devid) 65 66 def name(self): 67 raise NotImplementedError 68 69 def path(self): 70 return "%s/%s" % (self.SYSFS_BASEDIR, self.name()) 71 72 _name_pattern = None 73 74 @classmethod 75 def from_name(cls, name): 76 match = cls._name_pattern.search(name) 77 if not match: 78 raise Exception, "Malformed %s name: %s" % \ 79 (type(self).__name__, name) 80 81 rd = match.group(1) 82 devid = match.group(2) 83 84 return cls(rd, devid) 85 86 _name_glob = None 87 88 @classmethod 89 def find(cls): 90 paths = glob.glob("%s/%s" % (cls.SYSFS_BASEDIR, 91 cls._name_glob)) 92 for path in paths: 93 name = os.path.basename(path) 94 yield cls.from_name(name) 95 96 def find_rings(self): 97 for ring in self.Ring.find(self): 98 yield ring 99 100 class Ring(object): 101 102 def __init__(self, backend, name): 103 self.backend = backend 104 self.name = name 105 106 __size = None 107 108 def key(self): 109 return "%s/%s" % (self.backend.name(), 110 self.name) 111 112 def __str__(self): 113 return "%s(%s)" % (type(self).__name__, self.key()) 114 115 @classmethod 116 def from_name(cls, backend, name): 117 return cls(backend, name) 118 119 _name_glob = None 120 121 @classmethod 122 def find(cls, backend): 123 paths = glob.glob("%s/%s" % (backend.path(), 124 cls._name_glob)) 125 for path in paths: 126 name = os.path.basename(path) 127 yield cls.from_name(backend, name) 128 129 def path(self): 130 return "%s/%s" % (self.backend.path(), 131 self.name) 132 133 def read(self): 134 state = RingState.from_sysfs(self.path()) 135 return state 136 137 def write(self, cmd): 138 f = file(self.path(), 'w') 139 try: 140 f.write(cmd.rstrip()) 141 finally: 142 f.close() 143 144 def kick(self): 145 self.write("kick") 146 147 def poll(self): 148 self.write("poll") 149 150 __ring = None 151 152 TYPES = {} 153 XEN_BACKEND_NAME = None 154 155 @classmethod 156 def register(cls): 157 XenBackend.TYPES[cls.XEN_BACKEND_NAME] = cls 158 159class VBD(XenBackend): 160 """Xen blkif backends.""" 161 162 XEN_BACKEND_NAME = 'vbd' 163 164 _name_pattern = Pattern("vbd-(\d+)-(\d+)") 165 _name_glob = "vbd-*-*" 166 167 def name(self): 168 return "vbd-%d-%d" % (self.rd, self.devid) 169 170 class Ring(XenBackend.Ring): 171 _name_glob = "io_ring" 172 173VBD.register() 174 175class VIF(XenBackend): 176 """Xen netif backends.""" 177 178 XEN_BACKEND_NAME = 'vif' 179 180 _name_pattern = Pattern("vif-(\d+)-(\d+)") 181 _name_glob = "vif-*-*" 182 183 def name(self): 184 return "vif-%d-%d" % (self.rd, self.devid) 185 186 class Ring(XenBackend.Ring): 187 _name_glob = "{rx,tx}_ring" 188 189#VIF.register() 190 191class RingState(object): 192 """Overall backend ring state. Comprising req and rsp queue 193 indexes, and analysis.""" 194 195 def __init__(self, size, req, rsp): 196 self.size = int(size) 197 self.req = req 198 self.rsp = rsp 199 200 _size_pattern = Pattern("nr_ents (\d+)") 201 202 @classmethod 203 def from_sysfs(cls, path): 204 205 f = file(path, "r") 206 try: 207 s = f.read() 208 finally: 209 f.close() 210 211 try: 212 (_nr_ents, _req, _rsp, _) = s.split("\n") 213 214 match = cls._size_pattern.search(_nr_ents) 215 nr_ents = int(match.group(1)) 216 217 except Exception, e: 218 raise Exception, "Malformed %s input: %s (%s)" % \ 219 (cls.__name__, repr(s), str(e)) 220 221 req = cls.Req.from_sysfs(_req, size=nr_ents) 222 rsp = cls.Rsp.from_sysfs(_rsp, size=nr_ents) 223 224 return cls(nr_ents, req, rsp) 225 226 class Queue(dict): 227 228 def __init__(self, size): 229 self.size = int(size) 230 231 prod = None 232 233 @classmethod 234 def from_sysfs(cls, line, **d): 235 236 match = cls._pattern.search(line) 237 if not match: 238 raise Exception, "Malformed %s input: %s" % \ 239 (cls.__name__, repr(s)) 240 241 i = iter(match.groups()) 242 for k in i: 243 d[k] = i.next() 244 245 return cls(**d) 246 247 def is_consumed(self): 248 return self.prod == self._cons() 249 250 class Req(Queue): 251 252 _pattern = Pattern("req (prod) (\d+) (cons) (\d+) (event) (\d+)") 253 254 def __init__(self, prod, cons, event, **d): 255 RingState.Queue.__init__(self, **d) 256 self.prod = int(prod) 257 self.cons = int(cons) 258 self.event = int(event) 259 260 def __repr__(self): 261 return "%s(prod=%d, cons=%d, event=%d)" % \ 262 (type(self).__name__, self.prod, self.cons, self.event) 263 264 def _cons(self): 265 return self.cons 266 267 def __eq__(self, other): 268 return \ 269 self.prod == other.prod and \ 270 self.cons == other.cons and \ 271 self.event == other.event 272 273 class Rsp(Queue): 274 275 _pattern = Pattern("rsp (prod) (\d+) (pvt) (\d+) (event) (\d+)") 276 277 def __init__(self, prod, pvt, event, **d): 278 RingState.Queue.__init__(self, **d) 279 self.prod = int(prod) 280 self.pvt = int(pvt) 281 self.event = int(event) 282 283 def __repr__(self): 284 return "%s(prod=%d, pvt=%d, event=%d)" % \ 285 (type(self).__name__, self.prod, self.pvt, self.event) 286 287 def _cons(self): 288 return self.event - 1 289 290 def __eq__(self, other): 291 return \ 292 self.prod == other.prod and \ 293 self.pvt == other.pvt and \ 294 self.event == other.event 295 296 def is_consumed(self): 297 return \ 298 self.rsp.is_consumed() and \ 299 self.req.is_consumed() 300 301 def is_pending(self): 302 return self.rsp.prod != self.req.prod 303 304 def kick(self, ring): 305 action = False 306 307 if not self.req.is_consumed(): 308 action |= True 309 ring.poll() 310 311 if not self.rsp.is_consumed(): 312 action |= True 313 ring.kick() 314 315 return action 316 317 def __eq__(self, other): 318 return \ 319 self.size == other.size and \ 320 self.req == other.req and \ 321 self.rsp == other.rsp 322 323 def __repr__(self): 324 return "%s(size=%d, %s, %s)" % \ 325 (type(self).__name__, self.size, self.req, self.rsp) 326 327 def display(self): 328 complete = { True: "complete", False: "pending" } 329 330 io = complete[not self.is_pending()] 331 req = complete[self.req.is_consumed()] 332 rsp = complete[self.rsp.is_consumed()] 333 334 return "%s: io: %s, req: %s, rsp: %s" % (self, io, req, rsp) 335 336class RingWatch(object): 337 """State machine watching I/O individual ring state""" 338 339 _NEW = "_NEW" 340 BUSY = "BUSY" 341 IDLE = "IDLE" 342 STCK = "STCK" 343 344 COMMENTS = { BUSY: "Message traffic observed (OK)", 345 IDLE: "No messages observed (Ring OK, I/O depends)", 346 STCK: "No pending req/rsp consumer progress observed (BUG)" } 347 348 def __init__(self, ring, state): 349 self.ring = ring 350 self.state = state 351 self.status = RingWatch._NEW 352 353 @classmethod 354 def new(cls, ring): 355 state = ring.read() 356 return cls(ring, state) 357 358 def __str__(self): 359 return "%s(%s)[%s]" % \ 360 (type(self).__name__, self.ring.key(), self.status) 361 362 def is_stuck(self): 363 return self.status == self.STCK 364 365 def is_idle(self): 366 return self.status == self.IDLE 367 368 def kick(self): 369 if self.is_stuck(): 370 return self.state.kick(self.ring) 371 372 def update(self): 373 374 prev = self.state 375 curr = self.ring.read() 376 377 if curr == prev: 378 if not curr.is_consumed(): 379 self.status = self.STCK 380 else: 381 self.status = self.IDLE 382 else: 383 self.status = self.BUSY 384 385 self.state = curr 386 387 def display(self): 388 return "%s: %s" % (self, 389 self.state.display()) 390 391class WatchList(object): 392 """Managed collection of I/O rings under surveillance.""" 393 394 def __init__(self, gen): 395 self.gen = gen 396 self.list = {} 397 398 def update(self): 399 400 # NB. clear the watch list, then rebuild it. new entries get 401 # added, existing ones updates, those gone discarded. 402 prev = self.list 403 self.list = {} 404 405 for ring in self.gen(): 406 407 key = ring.key() 408 entry = prev.get(key) 409 410 try: 411 if not entry: 412 entry = RingWatch.new(ring) 413 else: 414 entry.update() 415 416 except IOError, e: 417 pass 418 # NB. racing unplug, any ring.read() may raise. 419 # nothing left to memorize then. 420 else: 421 self.list[key] = entry 422 423 def __iter__(self): 424 return self.list.itervalues() 425 426 def pending(self): 427 for entry in self: 428 if entry.is_idle() and entry.state.is_pending(): 429 yield entry 430 431 def stuck(self): 432 for entry in self: 433 if entry.is_stuck(): 434 yield entry 435 436 def kick(self): 437 for entry in self.stuck(): 438 try: 439 entry.kick() 440 except IOError: 441 # NB. racing unplug, any ring.write() may raise. 442 pass 443 444if __name__ == '__main__': 445 from sys import argv, stdout, stderr, exit 446 from getopt import gnu_getopt, GetoptError 447 from pprint import pprint 448 449 DEFAULT_PERIOD = 1 # secs 450 451 verbose = 0 452 period = DEFAULT_PERIOD 453 backends = XenBackend.TYPES.values() 454 kick = False 455 iowatch = False 456 457 OPTIONS = ((('h', 'help'), 458 "Print this help screen."), 459 460 (('v', 'verbose'), 461 "Increase output verbosity level (use n-times)."), 462 463 (('I', 'io'), 464 "Watch out for stuck I/O (not messaging), too. (%s)" % \ 465 (iowatch)), 466 467 (('t', 'types'), 468 "Comma separated list of backend types to watch. (%s)" % \ 469 ",".join(map(lambda t: t.XEN_BACKEND_NAME, backends))), 470 471 (('T', 'period'), 472 "Watch update period. (%d) [secs]" % \ 473 (period)), 474 475 (('k', 'kick'), 476 "Kick broken guests out of cardiac arrest. (%s)" % \ 477 (kick)) 478 ) 479 480 COMMANDS = {"check": 481 "Single iteration quick test (takes -T seconds)."} 482 483 def usage(stream): 484 prog = os.path.basename(argv[0]) 485 486 print >>stream 487 488 print >>stream, "Usage:" 489 print >>stream, "\t%s [options] {%s}" % (prog, "|".join(COMMANDS)) 490 491 print >>stream 492 493 print >>stream, "Commands:" 494 for (name, desc) in COMMANDS.iteritems(): 495 print >>stream, "\t%s: \t%s" % (name, desc) 496 497 print >>stream 498 499 print >>stream, "Options:" 500 for ((short, _long), desc) in OPTIONS: 501 print >>stream, "\t-%s, --%s: \t%s" % (short, _long, desc) 502 503 print >>stream 504 505 def fail(msg = None): 506 if msg: print >>stderr, "Error: %s" % msg 507 usage(stderr) 508 exit(1) 509 510 def help(): 511 512 usage(stdout) 513 514 print __doc__ % (XenBackend.SYSFS_BASEDIR, RingWatch.STCK) 515 516 print "Backend Types:" 517 for k, v in XenBackend.TYPES.iteritems(): 518 print "\t%s: \t%s (%s)" % (k, v.__doc__, v._name_glob) 519 520 print 521 print "Ring States:" 522 for k, v in RingWatch.COMMENTS.iteritems(): 523 print "\t%s: \t%s" % (k, v) 524 525 print 526 527 try: 528 opts, args = gnu_getopt(argv[1:], 529 "hIkt:vT:", 530 ["help", 531 "io", 532 "kick", 533 "type=", 534 "verbose", 535 "period="]) 536 except GetoptError, e: 537 fail(str(e)) 538 539 for (o, arg) in opts: 540 try: 541 if o in ('-h', '--help'): 542 help() 543 exit(0) 544 545 elif o in ['-v', '--verbose']: 546 verbose += 1 547 548 elif o in ['-I', '--io']: 549 iowatch = True 550 551 elif o in ('-T', '--period'): 552 period = int(arg) 553 554 elif o in ('-t', '--type'): 555 backends = ",".split(arg) 556 backends = map(lambda t: XenBackend.TYPES[t], backends) 557 558 elif o in ('-k', '--kick'): 559 kick = True 560 561 else: 562 raise "BUG: option %s unhandled." % o 563 564 except ValueError: 565 fail("%s: invalid argument '%s'." % (o, arg)) 566 567 try: 568 cmd = args[0] 569 except IndexError: 570 fail("Missing command.") 571 572 def ring_select(): 573 for _type in backends: 574 for backend in _type.find(): 575 for ring in backend.find_rings(): 576 yield ring 577 578 def show(entries): 579 for watch in entries: 580 print watch.display() 581 582 def pause(): 583 import time 584 time.sleep(period) 585 586 watches = WatchList(ring_select) 587 588 if cmd == "check": 589 590 # init 591 watches.update() 592 593 if verbose >= 2: 594 show(watches) 595 596 # watch for one round 597 pause() 598 watches.update() 599 600 # show result 601 crit = list(watches.stuck()) 602 stuck = bool(crit) 603 604 if (iowatch): 605 crit.extend(watches.pending()) 606 607 if verbose >= 1: 608 show(watches) 609 elif crit: 610 show(crit) 611 612 if stuck and kick: 613 # deal with it 614 watches.kick() 615 616 else: 617 fail("Invalid command.") 618