1#!/usr/bin/env python3
2__package__ = 'configurator.pyodide'
3
4import json
5import logging
6import re
7import os
8from copy import deepcopy
9
10import elementpath
11from defusedxml.lxml import fromstring
12from bs4 import BeautifulSoup
13
14from . import convert_result, nuc11_board, scenario_json_schema, nuc11_board_path
15
16
17def get_dynamic_scenario(board):
18    """
19
20    :type board: str
21    :param board: board xml text
22    """
23    board_xml = fromstring(board)
24
25    def get_enum(source, options, option_names, obj_type):
26        elements = [str(x) for x in elementpath.select(source, options) if x]
27        element_names = [str(x) for x in elementpath.select(source, option_names) if x]
28        elements = list(set(zip(elements, element_names)))
29        if not elements:
30            elements = [('', '')]
31        # TODO: Add more converters if needed
32        enum_type_convert = {'integer': lambda x: int(x) if x else 0}
33        if obj_type in enum_type_convert.keys():
34            elements = [(enum_type_convert[obj_type](x[0]), x[1]) for x in elements]
35        return elements
36
37    def dynamic_enum(**enum_setting):
38        # value from env
39        function, source = [
40            {"get_enum": get_enum, "board_xml": board_xml}[enum_setting[key]]
41            for key in ['function', 'source']
42        ]
43        # value from given
44        selector, name_selector, sorted_func, obj_type = [enum_setting[key] for key in ['selector', 'name-selector', 'sorted', 'type']]
45
46        # get enum data
47        enum = function(source, selector, name_selector, obj_type)
48        if sorted_func:
49            fn = eval(sorted_func)
50            enum = sorted(enum, key=lambda x: fn(x[0]))
51        return zip(*enum)
52
53    def dynamic_enum_apply(obj):
54        # get json schema enum obj
55        if 'enum' in obj and isinstance(obj['enum'], dict):
56            enum_setting = obj['enum']
57            # check enum obj type
58            if enum_setting['type'] == 'dynamicEnum':
59                enum_setting['type'] = obj.get('type', '')
60                # replace json schema obj enum field data
61                enum, enum_names = dynamic_enum(**enum_setting)
62                obj['enum'] = enum
63                obj['enumNames'] = enum_names
64        return obj
65
66    data = json.loads(scenario_json_schema, object_hook=dynamic_enum_apply)
67
68    form_schemas = {}
69    tab_types = ['HV', 'PreLaunchedVM', 'ServiceVM', 'PostLaunchedVM']
70    form_types = ['BasicConfigType', 'AdvancedConfigType']
71    for tab_type in tab_types:
72        form_schemas[tab_type] = {}
73        for form_type in form_types:
74            form_schema = deepcopy(data)
75            current_form_type_schema_obj = form_schema['definitions'][f'{tab_type}{form_type}']
76            for key in ['type', 'required', 'properties']:
77                if key == 'required' and key not in current_form_type_schema_obj:
78                    form_schema[key] = []
79                    continue
80                form_schema[key] = current_form_type_schema_obj[key]
81            form_schemas[tab_type][form_type] = form_schema
82
83    return form_schemas
84
85
86def get_cat_info(soup):
87    threads = soup.select('core thread')
88    threads = {thread.attrs['id']: thread.select_one('cpu_id').text for thread in threads}
89    caches = soup.select('caches cache')
90    cat_info = []
91    for cache in caches:
92        cache_level = int(cache.attrs['level'])
93
94        # ignore cache_level 1 and single core cache region
95        if cache_level == 1 or len(processors := cache.select('processors processor')) <= 1:
96            continue
97        # ignore no CAT capability cache region
98        if cache.select_one('#CAT') is None:
99            continue
100
101        capacity_mask_length = cache.select_one('capability capacity_mask_length')
102        capacity_mask_length = int(capacity_mask_length.text)
103
104        processors = [int(threads[processor.text]) for processor in processors]
105        processors.sort()
106        cache_info = {
107            'id': cache.attrs['id'],
108            'level': cache_level,
109            'type': cache.attrs['type'],
110            'cache_size': int(cache.select_one('cache_size').text),
111            'capacity_mask_length': capacity_mask_length,
112            'processors': processors,
113        }
114        cat_info.append(cache_info)
115    cat_info.sort(key=lambda x: int(x['id'], 16))
116    cat_info.sort(key=lambda x: x['level'], reverse=True)
117    return cat_info
118
119
120def get_board_info(board, path):
121    soup = BeautifulSoup(board, 'xml')
122    # Workaround: The pyodide thinks it runs under os.name == posix.
123    # So os.path.basename will not work on windows.
124    # Here we replace all '\' with '/' to make it work (most of the time).
125    try:
126        path = path.replace('\\', '/')
127        fname = os.path.basename(path)
128        basename, _ = os.path.splitext(fname)
129        board_name = basename + '.xml' if basename.endswith('.board') \
130            else basename + '.board.xml'
131    except Exception as e:
132        logging.warning(e)
133        board_name = 'default'
134    result = {
135        'name': board_name,
136        'content': board,
137        'CAT_INFO': get_cat_info(soup),
138        'BIOS_INFO': soup.select_one('BIOS_INFO').text,
139        'BASE_BOARD_INFO': soup.select_one('BASE_BOARD_INFO').text
140    }
141    return result
142
143
144def load_board(board, path):
145    result = {
146        'scenarioJSONSchema': get_dynamic_scenario(board),
147        'boardInfo': get_board_info(board, path)
148    }
149    return convert_result(result)
150
151
152def test():
153    load_board(nuc11_board, nuc11_board_path)
154
155
156main = load_board
157
158if __name__ == '__main__':
159    test()
160