1#!/usr/bin/env python3 2__package__ = 'configurator.pyodide' 3 4import json 5import logging 6import re 7import os 8from copy import deepcopy 9 10import elementpath 11from defusedxml.lxml import fromstring 12from bs4 import BeautifulSoup 13 14from . import convert_result, nuc11_board, scenario_json_schema, nuc11_board_path 15 16 17def get_dynamic_scenario(board): 18 """ 19 20 :type board: str 21 :param board: board xml text 22 """ 23 board_xml = fromstring(board) 24 25 def get_enum(source, options, option_names, obj_type): 26 elements = [str(x) for x in elementpath.select(source, options) if x] 27 element_names = [str(x) for x in elementpath.select(source, option_names) if x] 28 elements = list(set(zip(elements, element_names))) 29 if not elements: 30 elements = [('', '')] 31 # TODO: Add more converters if needed 32 enum_type_convert = {'integer': lambda x: int(x) if x else 0} 33 if obj_type in enum_type_convert.keys(): 34 elements = [(enum_type_convert[obj_type](x[0]), x[1]) for x in elements] 35 return elements 36 37 def dynamic_enum(**enum_setting): 38 # value from env 39 function, source = [ 40 {"get_enum": get_enum, "board_xml": board_xml}[enum_setting[key]] 41 for key in ['function', 'source'] 42 ] 43 # value from given 44 selector, name_selector, sorted_func, obj_type = [enum_setting[key] for key in ['selector', 'name-selector', 'sorted', 'type']] 45 46 # get enum data 47 enum = function(source, selector, name_selector, obj_type) 48 if sorted_func: 49 fn = eval(sorted_func) 50 enum = sorted(enum, key=lambda x: fn(x[0])) 51 return zip(*enum) 52 53 def dynamic_enum_apply(obj): 54 # get json schema enum obj 55 if 'enum' in obj and isinstance(obj['enum'], dict): 56 enum_setting = obj['enum'] 57 # check enum obj type 58 if enum_setting['type'] == 'dynamicEnum': 59 enum_setting['type'] = obj.get('type', '') 60 # replace json schema obj enum field data 61 enum, enum_names = dynamic_enum(**enum_setting) 62 obj['enum'] = enum 63 obj['enumNames'] = enum_names 64 return obj 65 66 data = json.loads(scenario_json_schema, object_hook=dynamic_enum_apply) 67 68 form_schemas = {} 69 tab_types = ['HV', 'PreLaunchedVM', 'ServiceVM', 'PostLaunchedVM'] 70 form_types = ['BasicConfigType', 'AdvancedConfigType'] 71 for tab_type in tab_types: 72 form_schemas[tab_type] = {} 73 for form_type in form_types: 74 form_schema = deepcopy(data) 75 current_form_type_schema_obj = form_schema['definitions'][f'{tab_type}{form_type}'] 76 for key in ['type', 'required', 'properties']: 77 if key == 'required' and key not in current_form_type_schema_obj: 78 form_schema[key] = [] 79 continue 80 form_schema[key] = current_form_type_schema_obj[key] 81 form_schemas[tab_type][form_type] = form_schema 82 83 return form_schemas 84 85 86def get_cat_info(soup): 87 threads = soup.select('core thread') 88 threads = {thread.attrs['id']: thread.select_one('cpu_id').text for thread in threads} 89 caches = soup.select('caches cache') 90 cat_info = [] 91 for cache in caches: 92 cache_level = int(cache.attrs['level']) 93 94 # ignore cache_level 1 and single core cache region 95 if cache_level == 1 or len(processors := cache.select('processors processor')) <= 1: 96 continue 97 # ignore no CAT capability cache region 98 if cache.select_one('#CAT') is None: 99 continue 100 101 capacity_mask_length = cache.select_one('capability capacity_mask_length') 102 capacity_mask_length = int(capacity_mask_length.text) 103 104 processors = [int(threads[processor.text]) for processor in processors] 105 processors.sort() 106 cache_info = { 107 'id': cache.attrs['id'], 108 'level': cache_level, 109 'type': cache.attrs['type'], 110 'cache_size': int(cache.select_one('cache_size').text), 111 'capacity_mask_length': capacity_mask_length, 112 'processors': processors, 113 } 114 cat_info.append(cache_info) 115 cat_info.sort(key=lambda x: int(x['id'], 16)) 116 cat_info.sort(key=lambda x: x['level'], reverse=True) 117 return cat_info 118 119 120def get_board_info(board, path): 121 soup = BeautifulSoup(board, 'xml') 122 # Workaround: The pyodide thinks it runs under os.name == posix. 123 # So os.path.basename will not work on windows. 124 # Here we replace all '\' with '/' to make it work (most of the time). 125 try: 126 path = path.replace('\\', '/') 127 fname = os.path.basename(path) 128 basename, _ = os.path.splitext(fname) 129 board_name = basename + '.xml' if basename.endswith('.board') \ 130 else basename + '.board.xml' 131 except Exception as e: 132 logging.warning(e) 133 board_name = 'default' 134 result = { 135 'name': board_name, 136 'content': board, 137 'CAT_INFO': get_cat_info(soup), 138 'BIOS_INFO': soup.select_one('BIOS_INFO').text, 139 'BASE_BOARD_INFO': soup.select_one('BASE_BOARD_INFO').text 140 } 141 return result 142 143 144def load_board(board, path): 145 result = { 146 'scenarioJSONSchema': get_dynamic_scenario(board), 147 'boardInfo': get_board_info(board, path) 148 } 149 return convert_result(result) 150 151 152def test(): 153 load_board(nuc11_board, nuc11_board_path) 154 155 156main = load_board 157 158if __name__ == '__main__': 159 test() 160