1#!/usr/bin/env python3
2
3# Copyright (c) 2019 Nordic Semiconductor ASA
4# SPDX-License-Identifier: Apache-2.0
5
6"""
7Lists maintainers for files or commits. Similar in function to
8scripts/get_maintainer.pl from Linux, but geared towards GitHub. The mapping is
9in MAINTAINERS.yml.
10
11The comment at the top of MAINTAINERS.yml in Zephyr documents the file format.
12
13See the help texts for the various subcommands for more information. They can
14be viewed with e.g.
15
16    ./get_maintainer.py path --help
17
18This executable doubles as a Python library. Identifiers not prefixed with '_'
19are part of the library API. The library documentation can be viewed with this
20command:
21
22    $ pydoc get_maintainer
23"""
24
25import argparse
26import operator
27import os
28import pathlib
29import re
30import shlex
31import subprocess
32import sys
33from tabulate import tabulate
34
35from yaml import load, YAMLError
36try:
37    # Use the speedier C LibYAML parser if available
38    from yaml import CSafeLoader as SafeLoader
39except ImportError:
40    from yaml import SafeLoader
41
42
43def _main():
44    # Entry point when run as an executable
45
46    args = _parse_args()
47    try:
48        args.cmd_fn(Maintainers(args.maintainers), args)
49    except (MaintainersError, GitError) as e:
50        _serr(e)
51
52
53def _parse_args():
54    # Parses arguments when run as an executable
55
56    parser = argparse.ArgumentParser(
57        formatter_class=argparse.RawDescriptionHelpFormatter,
58        description=__doc__, allow_abbrev=False)
59
60    parser.add_argument(
61        "-m", "--maintainers",
62        metavar="MAINTAINERS_FILE",
63        help="Maintainers file to load. If not specified, MAINTAINERS.yml in "
64             "the top-level repository directory is used, and must exist. "
65             "Paths in the maintainers file will always be taken as relative "
66             "to the top-level directory.")
67
68    subparsers = parser.add_subparsers(
69        help="Available commands (each has a separate --help text)")
70
71    id_parser = subparsers.add_parser(
72        "path",
73        help="List area(s) for paths")
74    id_parser.add_argument(
75        "paths",
76        metavar="PATH",
77        nargs="*",
78        help="Path to list areas for")
79    id_parser.set_defaults(cmd_fn=Maintainers._path_cmd)
80
81    commits_parser = subparsers.add_parser(
82        "commits",
83        help="List area(s) for commit range")
84    commits_parser.add_argument(
85        "commits",
86        metavar="COMMIT_RANGE",
87        nargs="*",
88        help="Commit range to list areas for (default: HEAD~..)")
89    commits_parser.set_defaults(cmd_fn=Maintainers._commits_cmd)
90
91    list_parser = subparsers.add_parser(
92        "list",
93        help="List files in areas")
94    list_parser.add_argument(
95        "area",
96        metavar="AREA",
97        nargs="?",
98        help="Name of area to list files in. If not specified, all "
99             "non-orphaned files are listed (all files that do not appear in "
100             "any area).")
101    list_parser.set_defaults(cmd_fn=Maintainers._list_cmd)
102
103    areas_parser = subparsers.add_parser(
104        "areas",
105        help="List areas and maintainers")
106    areas_parser.add_argument(
107        "maintainer",
108        metavar="MAINTAINER",
109        nargs="?",
110        help="List all areas maintained by maintainer.")
111
112    # New arguments for filtering
113    areas_parser.add_argument(
114        "--without-maintainers",
115        action="store_true",
116        help="Exclude areas that have maintainers")
117    areas_parser.add_argument(
118        "--without-collaborators",
119        action="store_true",
120        help="Exclude areas that have collaborators")
121
122    areas_parser.set_defaults(cmd_fn=Maintainers._areas_cmd)
123
124    orphaned_parser = subparsers.add_parser(
125        "orphaned",
126        help="List orphaned files (files that do not appear in any area)")
127    orphaned_parser.add_argument(
128        "path",
129        metavar="PATH",
130        nargs="?",
131        help="Limit to files under PATH")
132    orphaned_parser.set_defaults(cmd_fn=Maintainers._orphaned_cmd)
133
134    count_parser = subparsers.add_parser(
135        "count",
136        help="Count areas, unique maintainers, and / or unique collaborators")
137    count_parser.add_argument(
138        "-a",
139        "--count-areas",
140        action="store_true",
141        help="Count the number of areas")
142    count_parser.add_argument(
143        "-c",
144        "--count-collaborators",
145        action="store_true",
146        help="Count the number of unique collaborators")
147    count_parser.add_argument(
148        "-n",
149        "--count-maintainers",
150        action="store_true",
151        help="Count the number of unique maintainers")
152    count_parser.add_argument(
153        "-o",
154        "--count-unmaintained",
155        action="store_true",
156        help="Count the number of unmaintained areas")
157    count_parser.set_defaults(cmd_fn=Maintainers._count_cmd)
158
159    args = parser.parse_args()
160    if not hasattr(args, "cmd_fn"):
161        # Called without a subcommand
162        sys.exit(parser.format_usage().rstrip())
163
164    return args
165
166
167class Maintainers:
168    """
169    Represents the contents of a maintainers YAML file.
170
171    These attributes are available:
172
173    areas:
174        A dictionary that maps area names to Area instances, for all areas
175        defined in the maintainers file
176
177    filename:
178        The path to the maintainers file
179    """
180    def __init__(self, filename=None):
181        """
182        Creates a Maintainers instance.
183
184        filename (default: None):
185            Path to the maintainers file to parse. If None, MAINTAINERS.yml in
186            the top-level directory of the Git repository is used, and must
187            exist.
188        """
189        if (filename is not None) and (pathlib.Path(filename).exists()):
190            self.filename = pathlib.Path(filename)
191            self._toplevel = self.filename.parent
192        else:
193            self._toplevel = pathlib.Path(_git("rev-parse", "--show-toplevel"))
194            self.filename = self._toplevel / "MAINTAINERS.yml"
195
196        self.areas = {}
197        for area_name, area_dict in _load_maintainers(self.filename).items():
198            area = Area()
199            area.name = area_name
200            area.status = area_dict.get("status")
201            area.maintainers = area_dict.get("maintainers", [])
202            area.collaborators = area_dict.get("collaborators", [])
203            area.inform = area_dict.get("inform", [])
204            area.labels = area_dict.get("labels", [])
205            area.tests = area_dict.get("tests", [])
206            area.tags = area_dict.get("tags", [])
207            area.description = area_dict.get("description")
208
209            # area._match_fn(path) tests if the path matches files and/or
210            # files-regex
211            area._match_fn = \
212                _get_match_fn(area_dict.get("files"),
213                              area_dict.get("files-regex"))
214
215            # Like area._match_fn(path), but for files-exclude and
216            # files-regex-exclude
217            area._exclude_match_fn = \
218                _get_match_fn(area_dict.get("files-exclude"),
219                              area_dict.get("files-regex-exclude"))
220
221            self.areas[area_name] = area
222
223    def path2areas(self, path):
224        """
225        Returns a list of Area instances for the areas that contain 'path',
226        taken as relative to the current directory
227        """
228        # Make directory paths end in '/' so that foo/bar matches foo/bar/.
229        # Skip this check in _contains() itself, because the isdir() makes it
230        # twice as slow in cases where it's not needed.
231        is_dir = os.path.isdir(path)
232
233        # Make 'path' relative to the repository root and normalize it.
234        # normpath() would remove a trailing '/', so we add it afterwards.
235        path = os.path.normpath(os.path.join(
236            os.path.relpath(os.getcwd(), self._toplevel),
237            path))
238
239        if is_dir:
240            path += "/"
241
242        return [area for area in self.areas.values()
243                if area._contains(path)]
244
245    def commits2areas(self, commits):
246        """
247        Returns a set() of Area instances for the areas that contain files that
248        are modified by the commit range in 'commits'. 'commits' could be e.g.
249        "HEAD~..", to inspect the tip commit
250        """
251        res = set()
252        # Final '--' is to make sure 'commits' is interpreted as a commit range
253        # rather than a path. That might give better error messages.
254        for path in _git("diff", "--name-only", commits, "--").splitlines():
255            res.update(self.path2areas(path))
256        return res
257
258    def __repr__(self):
259        return "<Maintainers for '{}'>".format(self.filename)
260
261    #
262    # Command-line subcommands
263    #
264
265    def _path_cmd(self, args):
266        # 'path' subcommand implementation
267
268        for path in args.paths:
269            if not os.path.exists(path):
270                _serr("'{}': no such file or directory".format(path))
271
272        res = set()
273        orphaned = []
274        for path in args.paths:
275            areas = self.path2areas(path)
276            res.update(areas)
277            if not areas:
278                orphaned.append(path)
279
280        _print_areas(res)
281        if orphaned:
282            if res:
283                print()
284            print("Orphaned paths (not in any area):\n" + "\n".join(orphaned))
285
286    def _commits_cmd(self, args):
287        # 'commits' subcommand implementation
288
289        commits = args.commits or ("HEAD~..",)
290        _print_areas({area for commit_range in commits
291                           for area in self.commits2areas(commit_range)})
292
293    def _areas_cmd(self, args):
294        # 'areas' subcommand implementation
295        def multiline(items):
296            # Each item on its own line, empty string if none
297            return "\n".join(items) if items else ""
298
299        table = []
300        for area in self.areas.values():
301            maintainers = multiline(area.maintainers)
302            collaborators = multiline(area.collaborators)
303
304            # Filter based on new arguments
305            if getattr(args, "without_maintainers", False) and area.maintainers:
306                continue
307            if getattr(args, "without_collaborators", False) and area.collaborators:
308                continue
309
310            if args.maintainer:
311                if args.maintainer in area.maintainers:
312                    table.append([
313                        area.name,
314                        maintainers,
315                        collaborators
316                    ])
317            else:
318                table.append([
319                    area.name,
320                    maintainers,
321                    collaborators
322                ])
323        if table:
324            print(tabulate(
325                table,
326                headers=["Area", "Maintainers", "Collaborators"],
327                tablefmt="grid",
328                stralign="left",
329                disable_numparse=True
330            ))
331
332    def _count_cmd(self, args):
333        # 'count' subcommand implementation
334
335        if not (args.count_areas or args.count_collaborators or args.count_maintainers or args.count_unmaintained):
336            # if no specific count is provided, print them all
337            args.count_areas = True
338            args.count_collaborators = True
339            args.count_maintainers = True
340            args.count_unmaintained = True
341
342        unmaintained = 0
343        collaborators = set()
344        maintainers = set()
345
346        for area in self.areas.values():
347            if area.status == 'maintained':
348                maintainers = maintainers.union(set(area.maintainers))
349            elif area.status == 'odd fixes':
350                unmaintained += 1
351            collaborators = collaborators.union(set(area.collaborators))
352
353        if args.count_areas:
354            print('{:14}\t{}'.format('areas:', len(self.areas)))
355        if args.count_maintainers:
356            print('{:14}\t{}'.format('maintainers:', len(maintainers)))
357        if args.count_collaborators:
358            print('{:14}\t{}'.format('collaborators:', len(collaborators)))
359        if args.count_unmaintained:
360            print('{:14}\t{}'.format('unmaintained:', unmaintained))
361
362    def _list_cmd(self, args):
363        # 'list' subcommand implementation
364
365        if args.area is None:
366            # List all files that appear in some area
367            for path in _ls_files():
368                for area in self.areas.values():
369                    if area._contains(path):
370                        print(path)
371                        break
372        else:
373            # List all files that appear in the given area
374            area = self.areas.get(args.area)
375            if area is None:
376                _serr("'{}': no such area defined in '{}'"
377                      .format(args.area, self.filename))
378
379            for path in _ls_files():
380                if area._contains(path):
381                    print(path)
382
383    def _orphaned_cmd(self, args):
384        # 'orphaned' subcommand implementation
385
386        if args.path is not None and not os.path.exists(args.path):
387            _serr("'{}': no such file or directory".format(args.path))
388
389        for path in _ls_files(args.path):
390            for area in self.areas.values():
391                if area._contains(path):
392                    break
393            else:
394                print(path)  # We get here if we never hit the 'break'
395
396
397class Area:
398    """
399    Represents an entry for an area in MAINTAINERS.yml.
400
401    These attributes are available:
402
403    status:
404        The status of the area, as a string. None if the area has no 'status'
405        key. See MAINTAINERS.yml.
406
407    maintainers:
408        List of maintainers. Empty if the area has no 'maintainers' key.
409
410    collaborators:
411        List of collaborators. Empty if the area has no 'collaborators' key.
412
413    inform:
414        List of people to inform on pull requests. Empty if the area has no
415        'inform' key.
416
417    labels:
418        List of GitHub labels for the area. Empty if the area has no 'labels'
419        key.
420
421    description:
422        Text from 'description' key, or None if the area has no 'description'
423        key
424    """
425    def _contains(self, path):
426        # Returns True if the area contains 'path', and False otherwise
427
428        return self._match_fn and self._match_fn(path) and not \
429            (self._exclude_match_fn and self._exclude_match_fn(path))
430
431    def __repr__(self):
432        return "<Area {}>".format(self.name)
433
434
435def _print_areas(areas):
436    first = True
437    for area in sorted(areas, key=operator.attrgetter("name")):
438        if not first:
439            print()
440        first = False
441
442        print("""\
443{}
444\tstatus: {}
445\tmaintainers: {}
446\tcollaborators: {}
447\tinform: {}
448\tlabels: {}
449\ttests: {}
450\ttags: {}
451\tdescription: {}""".format(area.name,
452                            area.status,
453                            ", ".join(area.maintainers),
454                            ", ".join(area.collaborators),
455                            ", ".join(area.inform),
456                            ", ".join(area.labels),
457                            ", ".join(area.tests),
458                            ", ".join(area.tags),
459                            area.description or ""))
460
461
462def _get_match_fn(globs, regexes):
463    # Constructs a single regex that tests for matches against the globs in
464    # 'globs' and the regexes in 'regexes'. Parts are joined with '|' (OR).
465    # Returns the search() method of the compiled regex.
466    #
467    # Returns None if there are neither globs nor regexes, which should be
468    # interpreted as no match.
469
470    if not (globs or regexes):
471        return None
472
473    regex = ""
474
475    if globs:
476        glob_regexes = []
477        for glob in globs:
478            # Construct a regex equivalent to the glob
479            glob_regex = glob.replace(".", "\\.").replace("*", "[^/]*") \
480                             .replace("?", "[^/]")
481
482            if not glob.endswith("/"):
483                # Require a full match for globs that don't end in /
484                glob_regex += "$"
485
486            glob_regexes.append(glob_regex)
487
488        # The glob regexes must anchor to the beginning of the path, since we
489        # return search(). (?:) is a non-capturing group.
490        regex += "^(?:{})".format("|".join(glob_regexes))
491
492    if regexes:
493        if regex:
494            regex += "|"
495        regex += "|".join(regexes)
496
497    return re.compile(regex).search
498
499
500def _load_maintainers(path):
501    # Returns the parsed contents of the maintainers file 'filename', also
502    # running checks on the contents. The returned format is plain Python
503    # dicts/lists/etc., mirroring the structure of the file.
504
505    with open(path, encoding="utf-8") as f:
506        try:
507            yaml = load(f, Loader=SafeLoader)
508        except YAMLError as e:
509            raise MaintainersError("{}: YAML error: {}".format(path, e))
510
511        _check_maintainers(path, yaml)
512        return yaml
513
514
515def _check_maintainers(maints_path, yaml):
516    # Checks the maintainers data in 'yaml', which comes from the maintainers
517    # file at maints_path, which is a pathlib.Path instance
518
519    root = maints_path.parent
520
521    def ferr(msg):
522        _err("{}: {}".format(maints_path, msg))  # Prepend the filename
523
524    if not isinstance(yaml, dict):
525        ferr("empty or malformed YAML (not a dict)")
526
527    ok_keys = {"status", "maintainers", "collaborators", "inform", "files",
528               "files-exclude", "files-regex", "files-regex-exclude",
529               "labels", "description", "tests", "tags"}
530
531    ok_status = {"maintained", "odd fixes", "unmaintained", "obsolete"}
532    ok_status_s = ", ".join('"' + s + '"' for s in ok_status)  # For messages
533
534    for area_name, area_dict in yaml.items():
535        if not isinstance(area_dict, dict):
536            ferr("malformed entry for area '{}' (not a dict)"
537                 .format(area_name))
538
539        for key in area_dict:
540            if key not in ok_keys:
541                ferr("unknown key '{}' in area '{}'"
542                     .format(key, area_name))
543
544        if "status" in area_dict and \
545           area_dict["status"] not in ok_status:
546            ferr("bad 'status' key on area '{}', should be one of {}"
547                 .format(area_name, ok_status_s))
548
549        if not area_dict.keys() & {"files", "files-regex"}:
550            ferr("either 'files' or 'files-regex' (or both) must be specified "
551                 "for area '{}'".format(area_name))
552
553        if not area_dict.get("maintainers") and area_dict.get("status") == "maintained":
554            ferr("maintained area '{}' with no maintainers".format(area_name))
555
556        for list_name in "maintainers", "collaborators", "inform", "files", \
557                         "files-regex", "labels", "tags", "tests":
558            if list_name in area_dict:
559                lst = area_dict[list_name]
560                if not (isinstance(lst, list) and
561                        all(isinstance(elm, str) for elm in lst)):
562                    ferr("malformed '{}' value for area '{}' -- should "
563                         "be a list of strings".format(list_name, area_name))
564
565        for files_key in "files", "files-exclude":
566            if files_key in area_dict:
567                for glob_pattern in area_dict[files_key]:
568                    # This could be changed if it turns out to be too slow,
569                    # e.g. to only check non-globbing filenames. The tuple() is
570                    # needed due to pathlib's glob() returning a generator.
571                    paths = tuple(root.glob(glob_pattern))
572                    if not paths:
573                        ferr("glob pattern '{}' in '{}' in area '{}' does not "
574                             "match any files".format(glob_pattern, files_key,
575                                                      area_name))
576                    if not glob_pattern.endswith("/"):
577                        if all(path.is_dir() for path in paths):
578                            ferr("glob pattern '{}' in '{}' in area '{}' "
579                                     "matches only directories, but has no "
580                                     "trailing '/'"
581                                     .format(glob_pattern, files_key,
582                                             area_name))
583
584        for files_regex_key in "files-regex", "files-regex-exclude":
585            if files_regex_key in area_dict:
586                for regex in area_dict[files_regex_key]:
587                    try:
588                        re.compile(regex)
589                    except re.error as e:
590                        ferr("bad regular expression '{}' in '{}' in "
591                             "'{}': {}".format(regex, files_regex_key,
592                                               area_name, e.msg))
593
594        if "description" in area_dict and \
595           not isinstance(area_dict["description"], str):
596            ferr("malformed 'description' value for area '{}' -- should be a "
597                 "string".format(area_name))
598
599
600def _git(*args):
601    # Helper for running a Git command. Returns the rstrip()ed stdout output.
602    # Called like git("diff"). Exits with SystemError (raised by sys.exit()) on
603    # errors.
604
605    git_cmd = ("git",) + args
606    git_cmd_s = " ".join(shlex.quote(word) for word in git_cmd)  # For errors
607
608    try:
609        git_process = subprocess.Popen(
610            git_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
611    except FileNotFoundError:
612        _giterr("git executable not found (when running '{}'). Check that "
613                "it's in listed in the PATH environment variable"
614                .format(git_cmd_s))
615    except OSError as e:
616        _giterr("error running '{}': {}".format(git_cmd_s, e))
617
618    stdout, stderr = git_process.communicate()
619    if git_process.returncode:
620        _giterr("error running '{}'\n\nstdout:\n{}\nstderr:\n{}".format(
621            git_cmd_s, stdout.decode("utf-8"), stderr.decode("utf-8")))
622
623    return stdout.decode("utf-8").rstrip()
624
625
626def _ls_files(path=None):
627    cmd = ["ls-files"]
628    if path is not None:
629        cmd.append(path)
630    return _git(*cmd).splitlines()
631
632
633def _err(msg):
634    raise MaintainersError(msg)
635
636
637def _giterr(msg):
638    raise GitError(msg)
639
640
641def _serr(msg):
642    # For reporting errors when get_maintainer.py is run as a script.
643    # sys.exit() shouldn't be used otherwise.
644    sys.exit("{}: error: {}".format(sys.argv[0], msg))
645
646
647class MaintainersError(Exception):
648    "Exception raised for MAINTAINERS.yml-related errors"
649
650
651class GitError(Exception):
652    "Exception raised for Git-related errors"
653
654
655if __name__ == "__main__":
656    _main()
657