1#!/usr/bin/env python3
2
3"""Analyze the test outcomes from a full CI run.
4
5This script can also run on outcomes from a partial run, but the results are
6less likely to be useful.
7"""
8
9import argparse
10import re
11import sys
12import traceback
13
14import check_test_cases
15
16class Results:
17    """Process analysis results."""
18
19    def __init__(self):
20        self.error_count = 0
21        self.warning_count = 0
22
23    @staticmethod
24    def log(fmt, *args, **kwargs):
25        sys.stderr.write((fmt + '\n').format(*args, **kwargs))
26
27    def error(self, fmt, *args, **kwargs):
28        self.log('Error: ' + fmt, *args, **kwargs)
29        self.error_count += 1
30
31    def warning(self, fmt, *args, **kwargs):
32        self.log('Warning: ' + fmt, *args, **kwargs)
33        self.warning_count += 1
34
35class TestCaseOutcomes:
36    """The outcomes of one test case across many configurations."""
37    # pylint: disable=too-few-public-methods
38
39    def __init__(self):
40        # Collect a list of witnesses of the test case succeeding or failing.
41        # Currently we don't do anything with witnesses except count them.
42        # The format of a witness is determined by the read_outcome_file
43        # function; it's the platform and configuration joined by ';'.
44        self.successes = []
45        self.failures = []
46
47    def hits(self):
48        """Return the number of times a test case has been run.
49
50        This includes passes and failures, but not skips.
51        """
52        return len(self.successes) + len(self.failures)
53
54class TestDescriptions(check_test_cases.TestDescriptionExplorer):
55    """Collect the available test cases."""
56
57    def __init__(self):
58        super().__init__()
59        self.descriptions = set()
60
61    def process_test_case(self, _per_file_state,
62                          file_name, _line_number, description):
63        """Record an available test case."""
64        base_name = re.sub(r'\.[^.]*$', '', re.sub(r'.*/', '', file_name))
65        key = ';'.join([base_name, description.decode('utf-8')])
66        self.descriptions.add(key)
67
68def collect_available_test_cases():
69    """Collect the available test cases."""
70    explorer = TestDescriptions()
71    explorer.walk_all()
72    return sorted(explorer.descriptions)
73
74def analyze_coverage(results, outcomes):
75    """Check that all available test cases are executed at least once."""
76    available = collect_available_test_cases()
77    for key in available:
78        hits = outcomes[key].hits() if key in outcomes else 0
79        if hits == 0:
80            # Make this a warning, not an error, as long as we haven't
81            # fixed this branch to have full coverage of test cases.
82            results.warning('Test case not executed: {}', key)
83
84def analyze_outcomes(outcomes):
85    """Run all analyses on the given outcome collection."""
86    results = Results()
87    analyze_coverage(results, outcomes)
88    return results
89
90def read_outcome_file(outcome_file):
91    """Parse an outcome file and return an outcome collection.
92
93An outcome collection is a dictionary mapping keys to TestCaseOutcomes objects.
94The keys are the test suite name and the test case description, separated
95by a semicolon.
96"""
97    outcomes = {}
98    with open(outcome_file, 'r', encoding='utf-8') as input_file:
99        for line in input_file:
100            (platform, config, suite, case, result, _cause) = line.split(';')
101            key = ';'.join([suite, case])
102            setup = ';'.join([platform, config])
103            if key not in outcomes:
104                outcomes[key] = TestCaseOutcomes()
105            if result == 'PASS':
106                outcomes[key].successes.append(setup)
107            elif result == 'FAIL':
108                outcomes[key].failures.append(setup)
109    return outcomes
110
111def analyze_outcome_file(outcome_file):
112    """Analyze the given outcome file."""
113    outcomes = read_outcome_file(outcome_file)
114    return analyze_outcomes(outcomes)
115
116def main():
117    try:
118        parser = argparse.ArgumentParser(description=__doc__)
119        parser.add_argument('outcomes', metavar='OUTCOMES.CSV',
120                            help='Outcome file to analyze')
121        options = parser.parse_args()
122        results = analyze_outcome_file(options.outcomes)
123        if results.error_count > 0:
124            sys.exit(1)
125    except Exception: # pylint: disable=broad-except
126        # Print the backtrace and exit explicitly with our chosen status.
127        traceback.print_exc()
128        sys.exit(120)
129
130if __name__ == '__main__':
131    main()
132