1#!/usr/bin/python
2#
3# Copyright (C) 2011 Citrix Systems, Inc.
4#
5# This library is free software; you can redistribute it and/or modify
6# it under the terms of version 2.1 of the GNU Lesser General Public
7# License as published by the Free Software Foundation.
8#
9# This library is distributed in the hope that it will be useful, but
10# WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12# Lesser General Public License for more details.
13#
14# You should have received a copy of the GNU Lesser General Public
15# License along with this library; If not, see <http://www.gnu.org/licenses/>.
16#
17
18"""Overview:
19
20        - Gather Xen I/O ring states
21          (from %s/*/ring)
22
23        - Update ring states every -T seconds.
24
25        - Determine if rings are idle or make progress.
26
27        - Determine if idle rings dropped notifications (%s).
28
29        - Instruct stuck backends to reissue notifications.
30"""
31
32import os
33import glob
34
35class Pattern(object):
36    """A regex pattern. Compiled on demand, then persisted."""
37
38    def __init__(self, regex):
39        self.regex     = regex
40        self.__pattern = None
41
42    def get(self):
43        import re
44
45        if not self.__pattern:
46            self.__pattern = re.compile(self.regex)
47
48        return self.__pattern
49
50    def search(self, s):
51        return self.get().search(s)
52
53class XenBackend(object):
54    """A Xen I/O backend."""
55
56    SYSFS_BASEDIR = "/sys/devices/xen-backend"
57
58    def __init__(self, rd, devid):
59        self.rd    = int(rd)
60        self.devid = int(devid)
61
62    def __repr__(self):
63        return "%s(%d, %d)" % (type(self).__name__,
64                               self.rd, self.devid)
65
66    def name(self):
67        raise NotImplementedError
68
69    def path(self):
70        return "%s/%s" % (self.SYSFS_BASEDIR, self.name())
71
72    _name_pattern = None
73
74    @classmethod
75    def from_name(cls, name):
76        match = cls._name_pattern.search(name)
77        if not match:
78            raise Exception, "Malformed %s name: %s" % \
79                (type(self).__name__, name)
80
81        rd    = match.group(1)
82        devid = match.group(2)
83
84        return cls(rd, devid)
85
86    _name_glob = None
87
88    @classmethod
89    def find(cls):
90        paths = glob.glob("%s/%s" % (cls.SYSFS_BASEDIR,
91                                     cls._name_glob))
92        for path in paths:
93            name = os.path.basename(path)
94            yield cls.from_name(name)
95
96    def find_rings(self):
97        for ring in self.Ring.find(self):
98            yield ring
99
100    class Ring(object):
101
102        def __init__(self, backend, name):
103            self.backend = backend
104            self.name    = name
105
106        __size = None
107
108        def key(self):
109            return "%s/%s" % (self.backend.name(),
110                              self.name)
111
112        def __str__(self):
113            return "%s(%s)" % (type(self).__name__, self.key())
114
115        @classmethod
116        def from_name(cls, backend, name):
117            return cls(backend, name)
118
119        _name_glob = None
120
121        @classmethod
122        def find(cls, backend):
123            paths = glob.glob("%s/%s" % (backend.path(),
124                                         cls._name_glob))
125            for path in paths:
126                name = os.path.basename(path)
127                yield cls.from_name(backend, name)
128
129        def path(self):
130            return "%s/%s" % (self.backend.path(),
131                              self.name)
132
133        def read(self):
134            state = RingState.from_sysfs(self.path())
135            return state
136
137        def write(self, cmd):
138            f = file(self.path(), 'w')
139            try:
140                f.write(cmd.rstrip())
141            finally:
142                f.close()
143
144        def kick(self):
145            self.write("kick")
146
147        def poll(self):
148            self.write("poll")
149
150    __ring = None
151
152    TYPES = {}
153    XEN_BACKEND_NAME = None
154
155    @classmethod
156    def register(cls):
157        XenBackend.TYPES[cls.XEN_BACKEND_NAME] = cls
158
159class VBD(XenBackend):
160    """Xen blkif backends."""
161
162    XEN_BACKEND_NAME = 'vbd'
163
164    _name_pattern = Pattern("vbd-(\d+)-(\d+)")
165    _name_glob    = "vbd-*-*"
166
167    def name(self):
168        return "vbd-%d-%d" % (self.rd, self.devid)
169
170    class Ring(XenBackend.Ring):
171        _name_glob = "io_ring"
172
173VBD.register()
174
175class VIF(XenBackend):
176    """Xen netif backends."""
177
178    XEN_BACKEND_NAME = 'vif'
179
180    _name_pattern = Pattern("vif-(\d+)-(\d+)")
181    _name_glob    = "vif-*-*"
182
183    def name(self):
184        return "vif-%d-%d" % (self.rd, self.devid)
185
186    class Ring(XenBackend.Ring):
187        _name_glob = "{rx,tx}_ring"
188
189#VIF.register()
190
191class RingState(object):
192    """Overall backend ring state. Comprising req and rsp queue
193    indexes, and analysis."""
194
195    def __init__(self, size, req, rsp):
196        self.size = int(size)
197        self.req  = req
198        self.rsp  = rsp
199
200    _size_pattern = Pattern("nr_ents (\d+)")
201
202    @classmethod
203    def from_sysfs(cls, path):
204
205        f = file(path, "r")
206        try:
207            s = f.read()
208        finally:
209            f.close()
210
211        try:
212            (_nr_ents, _req, _rsp, _) = s.split("\n")
213
214            match   = cls._size_pattern.search(_nr_ents)
215            nr_ents = int(match.group(1))
216
217        except Exception, e:
218            raise Exception, "Malformed %s input: %s (%s)" % \
219                (cls.__name__, repr(s), str(e))
220
221        req = cls.Req.from_sysfs(_req, size=nr_ents)
222        rsp = cls.Rsp.from_sysfs(_rsp, size=nr_ents)
223
224        return cls(nr_ents, req, rsp)
225
226    class Queue(dict):
227
228        def __init__(self, size):
229            self.size = int(size)
230
231        prod = None
232
233        @classmethod
234        def from_sysfs(cls, line, **d):
235
236            match = cls._pattern.search(line)
237            if not match:
238                raise Exception, "Malformed %s input: %s" % \
239                    (cls.__name__, repr(s))
240
241            i = iter(match.groups())
242            for k in i:
243                d[k] = i.next()
244
245            return cls(**d)
246
247        def is_consumed(self):
248            return self.prod == self._cons()
249
250    class Req(Queue):
251
252        _pattern = Pattern("req (prod) (\d+) (cons) (\d+) (event) (\d+)")
253
254        def __init__(self, prod, cons, event, **d):
255            RingState.Queue.__init__(self, **d)
256            self.prod  = int(prod)
257            self.cons  = int(cons)
258            self.event = int(event)
259
260        def __repr__(self):
261            return "%s(prod=%d, cons=%d, event=%d)" % \
262                (type(self).__name__, self.prod, self.cons, self.event)
263
264        def _cons(self):
265            return self.cons
266
267        def __eq__(self, other):
268            return \
269                self.prod  == other.prod and \
270                self.cons  == other.cons and \
271                self.event == other.event
272
273    class Rsp(Queue):
274
275        _pattern = Pattern("rsp (prod) (\d+) (pvt) (\d+) (event) (\d+)")
276
277        def __init__(self, prod, pvt, event, **d):
278            RingState.Queue.__init__(self, **d)
279            self.prod  = int(prod)
280            self.pvt   = int(pvt)
281            self.event = int(event)
282
283        def __repr__(self):
284            return "%s(prod=%d, pvt=%d, event=%d)" % \
285                (type(self).__name__, self.prod, self.pvt, self.event)
286
287        def _cons(self):
288            return self.event - 1
289
290        def __eq__(self, other):
291            return \
292                self.prod  == other.prod and \
293                self.pvt   == other.pvt  and \
294                self.event == other.event
295
296    def is_consumed(self):
297        return \
298            self.rsp.is_consumed() and \
299            self.req.is_consumed()
300
301    def is_pending(self):
302        return self.rsp.prod != self.req.prod
303
304    def kick(self, ring):
305        action = False
306
307        if not self.req.is_consumed():
308            action |= True
309            ring.poll()
310
311        if not self.rsp.is_consumed():
312            action |= True
313            ring.kick()
314
315        return action
316
317    def __eq__(self, other):
318        return \
319            self.size == other.size and \
320            self.req == other.req and \
321            self.rsp == other.rsp
322
323    def __repr__(self):
324        return "%s(size=%d, %s, %s)" % \
325            (type(self).__name__, self.size, self.req, self.rsp)
326
327    def display(self):
328        complete = { True: "complete", False: "pending" }
329
330        io  = complete[not self.is_pending()]
331        req = complete[self.req.is_consumed()]
332        rsp = complete[self.rsp.is_consumed()]
333
334        return "%s: io: %s, req: %s, rsp: %s" % (self, io, req, rsp)
335
336class RingWatch(object):
337    """State machine watching I/O individual ring state"""
338
339    _NEW  = "_NEW"
340    BUSY  = "BUSY"
341    IDLE  = "IDLE"
342    STCK  = "STCK"
343
344    COMMENTS = { BUSY: "Message traffic observed (OK)",
345                 IDLE: "No messages observed (Ring OK, I/O depends)",
346                 STCK: "No pending req/rsp consumer progress observed (BUG)" }
347
348    def __init__(self, ring, state):
349        self.ring   = ring
350        self.state  = state
351        self.status = RingWatch._NEW
352
353    @classmethod
354    def new(cls, ring):
355        state = ring.read()
356        return cls(ring, state)
357
358    def __str__(self):
359        return "%s(%s)[%s]" % \
360            (type(self).__name__, self.ring.key(), self.status)
361
362    def is_stuck(self):
363        return self.status == self.STCK
364
365    def is_idle(self):
366        return self.status == self.IDLE
367
368    def kick(self):
369        if self.is_stuck():
370            return self.state.kick(self.ring)
371
372    def update(self):
373
374        prev = self.state
375        curr = self.ring.read()
376
377        if curr == prev:
378            if not curr.is_consumed():
379                self.status = self.STCK
380            else:
381                self.status = self.IDLE
382        else:
383            self.status = self.BUSY
384
385        self.state = curr
386
387    def display(self):
388        return "%s: %s" % (self,
389                           self.state.display())
390
391class WatchList(object):
392    """Managed collection of I/O rings under surveillance."""
393
394    def __init__(self, gen):
395        self.gen  = gen
396        self.list = {}
397
398    def update(self):
399
400        # NB. clear the watch list, then rebuild it. new entries get
401        # added, existing ones updates, those gone discarded.
402        prev      = self.list
403        self.list = {}
404
405        for ring in self.gen():
406
407            key   = ring.key()
408            entry = prev.get(key)
409
410            try:
411                if not entry:
412                    entry = RingWatch.new(ring)
413                else:
414                    entry.update()
415
416            except IOError, e:
417                pass
418                # NB. racing unplug, any ring.read() may raise.
419                # nothing left to memorize then.
420            else:
421                self.list[key] = entry
422
423    def __iter__(self):
424        return self.list.itervalues()
425
426    def pending(self):
427        for entry in self:
428            if entry.is_idle() and entry.state.is_pending():
429                yield entry
430
431    def stuck(self):
432        for entry in self:
433            if entry.is_stuck():
434                yield entry
435
436    def kick(self):
437        for entry in self.stuck():
438            try:
439                entry.kick()
440            except IOError:
441                # NB. racing unplug, any ring.write() may raise.
442                pass
443
444if __name__ == '__main__':
445    from sys import argv, stdout, stderr, exit
446    from getopt import gnu_getopt, GetoptError
447    from pprint import pprint
448
449    DEFAULT_PERIOD = 1 # secs
450
451    verbose  = 0
452    period   = DEFAULT_PERIOD
453    backends = XenBackend.TYPES.values()
454    kick     = False
455    iowatch  = False
456
457    OPTIONS = ((('h', 'help'),
458                "Print this help screen."),
459
460               (('v', 'verbose'),
461                "Increase output verbosity level (use n-times)."),
462
463               (('I', 'io'),
464                "Watch out for stuck I/O (not messaging), too. (%s)" % \
465                    (iowatch)),
466
467               (('t', 'types'),
468                "Comma separated list of backend types to watch. (%s)" % \
469                    ",".join(map(lambda t: t.XEN_BACKEND_NAME, backends))),
470
471               (('T', 'period'),
472                "Watch update period. (%d) [secs]" % \
473                    (period)),
474
475               (('k', 'kick'),
476                "Kick broken guests out of cardiac arrest. (%s)" % \
477                    (kick))
478               )
479
480    COMMANDS = {"check":
481                    "Single iteration quick test (takes -T seconds)."}
482
483    def usage(stream):
484        prog = os.path.basename(argv[0])
485
486        print >>stream
487
488        print >>stream, "Usage:"
489        print >>stream, "\t%s [options] {%s}" % (prog, "|".join(COMMANDS))
490
491        print >>stream
492
493        print >>stream, "Commands:"
494        for (name, desc) in COMMANDS.iteritems():
495            print >>stream, "\t%s: \t%s" % (name, desc)
496
497        print >>stream
498
499        print >>stream, "Options:"
500        for ((short, _long), desc) in OPTIONS:
501            print >>stream, "\t-%s, --%s: \t%s" % (short, _long, desc)
502
503        print >>stream
504
505    def fail(msg = None):
506        if msg: print >>stderr, "Error: %s" % msg
507        usage(stderr)
508        exit(1)
509
510    def help():
511
512        usage(stdout)
513
514        print __doc__ % (XenBackend.SYSFS_BASEDIR, RingWatch.STCK)
515
516        print "Backend Types:"
517        for k, v in XenBackend.TYPES.iteritems():
518            print "\t%s: \t%s (%s)" % (k, v.__doc__, v._name_glob)
519
520        print
521        print "Ring States:"
522        for k, v in RingWatch.COMMENTS.iteritems():
523            print "\t%s: \t%s" % (k, v)
524
525        print
526
527    try:
528        opts, args = gnu_getopt(argv[1:],
529                                "hIkt:vT:",
530                                ["help",
531                                 "io",
532                                 "kick",
533                                 "type=",
534                                 "verbose",
535                                 "period="])
536    except GetoptError, e:
537        fail(str(e))
538
539    for (o, arg) in opts:
540        try:
541            if o in ('-h', '--help'):
542                help()
543                exit(0)
544
545            elif o in ['-v', '--verbose']:
546                verbose += 1
547
548            elif o in ['-I', '--io']:
549                iowatch = True
550
551            elif o in ('-T', '--period'):
552                period = int(arg)
553
554            elif o in ('-t', '--type'):
555                backends = ",".split(arg)
556                backends = map(lambda t: XenBackend.TYPES[t], backends)
557
558            elif o in ('-k', '--kick'):
559                kick = True
560
561            else:
562                raise "BUG: option %s unhandled." % o
563
564        except ValueError:
565            fail("%s: invalid argument '%s'." % (o, arg))
566
567    try:
568        cmd = args[0]
569    except IndexError:
570        fail("Missing command.")
571
572    def ring_select():
573        for _type in backends:
574            for backend in _type.find():
575                for ring in backend.find_rings():
576                    yield ring
577
578    def show(entries):
579        for watch in entries:
580            print watch.display()
581
582    def pause():
583        import time
584        time.sleep(period)
585
586    watches = WatchList(ring_select)
587
588    if cmd == "check":
589
590        # init
591        watches.update()
592
593        if verbose >= 2:
594            show(watches)
595
596        # watch for one round
597        pause()
598        watches.update()
599
600        # show result
601        crit  = list(watches.stuck())
602        stuck = bool(crit)
603
604        if (iowatch):
605            crit.extend(watches.pending())
606
607        if verbose >= 1:
608            show(watches)
609        elif crit:
610            show(crit)
611
612        if stuck and kick:
613            # deal with it
614            watches.kick()
615
616    else:
617        fail("Invalid command.")
618