1#!/usr/bin/env python3
2#
3# Copyright 2025 The Hafnium Authors.
4#
5# Use of this source code is governed by a BSD-style
6# license that can be found in the LICENSE file or at
7# https://opensource.org/licenses/BSD-3-Clause.
8
9"""
10shrinkwrap_utils.py
11
12Helper class for managing Shrinkwrap integration within the Hafnium test framework.
13
14This module provides the `ShrinkwrapManager` class, which handles:
15  - Setting up the Shrinkwrap environment.
16  - Generating dynamic YAML overlay from runtime-evaluated `rtvars` and `params`
17  - Invoking Shrinkwrap `build` once per test session
18  - Running Shrinkwrap for each test using the generated dynamic overlay.
19
20Used by FVP-based drivers (e.g. FvpDriver, FvpDriverSPMC, etc.) to simplify
21and reuse Shrinkwrap logic.
22"""
23
24import os
25import subprocess
26import yaml
27import shutil
28import logging
29import re
30
31# Inherits the config settings from global logger in hftest.py
32logger = logging.getLogger(__name__)
33
34VM_PARAM_OFFSET = 5
35INITRD_PARAM_OFFSET = 8
36SP_PARAM_OFFSET = 10
37
38# Maps FVP driver configuration keys to their corresponding static Shrinkwrap overlays
39SHRINKWRAP_STATIC_OVERLAY_MAP = {
40    "hypervisor": ["fvp_hf_hypervisor_preloaded.yaml"],
41    "spmc": ["fvp_hf_spmc_preloaded.yaml"],
42    "hypervisor_and_spmc": ["fvp_hf_hypervisor_and_spmc_preloaded.yaml"],
43    "el3_spmc": ["fvp_hf_el3spmc_preloaded.yaml"],
44    "hypervisor_el3_spmc": ["fvp_hf_hypervisor_el3spmc_preloaded.yaml"]
45}
46
47class ShrinkwrapManager:
48
49    CONFIG_SUBDIR = os.path.join("tools", "shrinkwrap", "configs", "kokoro")
50    FVP_BASE_PACKAGE = "FVP_Base_RevC-2xAEMvA-hafnium.yaml"
51    DEFAULT_DYNAMIC_OVERLAY = "fvp_hf_dynamic_overlay.yaml"
52    _fvp_package_built = False
53
54    def __init__(self, hafnium_root):
55        self.hafnium_root = hafnium_root
56        self.env = self.setup_env()
57
58    def _get_config_dir(self):
59        """Returns the absolute path to Shrinkwrap yaml configs directory."""
60        return os.path.join(self.hafnium_root, self.CONFIG_SUBDIR)
61
62    def setup_env(self):
63        """
64        Sets up and returns the environment required for Shrinkwrap.
65        Returns:
66            dict: Updated environment dictionary for Shrinkwrap use.
67        """
68        env = os.environ.copy()
69
70        # Specify explicit environment variables (e.g., CI, Docker), fallback to /src/out
71        default_workspace = os.path.join(self.hafnium_root, "out")
72        default_config_dir = self._get_config_dir()
73
74        # Set Shrinkwrap-specific environment variables.
75        env["SHRINKWRAP_CONFIG"] = env.get("SHRINKWRAP_CONFIG", default_config_dir)
76        env["SHRINKWRAP_BUILD"] = env.get("SHRINKWRAP_BUILD", os.path.join(default_workspace, "build"))
77        env["SHRINKWRAP_PACKAGE"] = env.get("SHRINKWRAP_PACKAGE", os.path.join(default_workspace, "package"))
78
79        # Validate output directories exist (skip errors if in read-only FS)
80        try:
81            os.makedirs(env["SHRINKWRAP_BUILD"], exist_ok=True)
82            os.makedirs(env["SHRINKWRAP_PACKAGE"], exist_ok=True)
83        except OSError:
84            pass # Likely running in read-only root filesystem
85
86        # Add Shrinkwrap's CLI path to PATH if not already present
87        shrinkwrap_binary = os.path.join(self.hafnium_root, "third_party", "shrinkwrap", "shrinkwrap")
88        if shrinkwrap_binary not in env.get("PATH", ""):
89            env["PATH"] = shrinkwrap_binary + os.pathsep + env.get("PATH", "")
90
91        # Validate shrinkwrap CLI exists
92        try:
93            resolved_path = self.get_shrinkwrap_cmd(env)
94        except RuntimeError:
95            raise
96
97        # Print the Shrinkwrap environment variables once for every test session(DEBUG only)
98        if not getattr(ShrinkwrapManager.setup_env, "_has_printed", False):
99            logger.debug("Shrinkwrap environment variables set:")
100            for key in ["SHRINKWRAP_CONFIG", "SHRINKWRAP_BUILD", "SHRINKWRAP_PACKAGE", "PATH"]:
101                logger.debug("  %s = %s", key, env[key])
102            ShrinkwrapManager.setup_env._has_printed = True
103        return env
104
105    @staticmethod
106    def add_multi_param_with_spacing(base_key, values, offset=5):
107        """
108        Generate Shrinkwrap-compliant spaced param keys for duplicate base keys.
109        """
110        return {(" " * (i + offset)) + base_key: val for i, val in enumerate(values)}
111
112    def ensure_config_dir(self):
113        """Ensure the Shrinkwrap kokoro config directory exists."""
114        os.makedirs(self._get_config_dir(), exist_ok=True)
115
116    def get_dynamic_overlay_path(self, filename="fvp_hf_dynamic_overlay.yaml"):
117        """Return the absolute path to the overlay YAML file in the kokoro config directory."""
118        self.ensure_config_dir()
119        return os.path.join(self._get_config_dir(), filename or self.DEFAULT_DYNAMIC_OVERLAY)
120
121    def write_overlay_yaml(self, overlay_path, rtvars, new_params=None, fvp_name=None):
122        """
123        Write rtvars and optional params to a YAML overlay file.
124        Args:
125            overlay_path (str): Full path to the overlay YAML file.
126            rtvars (dict): Dictionary of runtime variables.
127            params (dict, optional): Optional FVP parameters.
128            run_name (str, optional): Optional value for run.name.
129        """
130        overlay = {"run": {"rtvars": rtvars}}
131        if new_params:
132            overlay["run"]["params"] = new_params
133        if fvp_name:
134            overlay["run"]["name"] = fvp_name
135
136        with open(overlay_path, "w") as f:
137            yaml.safe_dump(overlay, f, sort_keys=False)
138
139    def get_shrinkwrap_cmd(self, env):
140        """Check for Shrinkwrap CLI availability and return its path."""
141        path = shutil.which("shrinkwrap", path=env.get("PATH", ""))
142        if not path:
143            raise RuntimeError(
144                "'shrinkwrap' CLI not found in PATH. "
145                "Please ensure it's built and available at out/shrinkwrap/shrinkwrap."
146            )
147        return path
148
149    def build_fvp_package_once(self, overlays):
150        """
151        Builds the Shrinkwrap FVP package using static YAML overlays,
152        if not already built.
153        This is a one-time setup performed per test session (i.e., per hftest.py invocation),
154        and combines the base FVP model with test-specific static configuration overlays.
155        """
156        if getattr(self.__class__, "_fvp_package_built", False):
157            return
158
159        config_dir = self._get_config_dir()
160
161        # NOTE: Do NOT move or modify the build_cmd initialization block below.
162        # The base command (with FVP package) MUST be passed first,
163        # followed by all --overlay options (including loop-based and conditional ones).
164        # Changing this order may break Shrinkwrap's build semantics.
165        build_cmd = [
166            "shrinkwrap", "--runtime", "null", "build",
167            os.path.join(config_dir, "FVP_Base_RevC-2xAEMvA-hafnium.yaml")
168        ]
169        for overlay in overlays:
170            build_cmd += ["--overlay",  os.path.join(config_dir, overlay)]
171
172        logger.debug("Shrinkwrap BUILD CMD:\n%s", " ".join(build_cmd))
173        self.__class__._fvp_package_built = True
174
175        try:
176            subprocess.run(build_cmd, env=self.env, check=True,
177                           stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
178            logger.debug("\u2705 Shrinkwrap build succeeded")
179        except subprocess.CalledProcessError as e:
180            raise RuntimeError("\u274C Shrinkwrap build step failed") from e
181
182    def run_fvp(self, run_state, execute_logged_fn, is_long_running, debug,
183                show_output, cov_plugin, dynamic_overlay=None):
184        """
185        Executes Shrinkwrap 'run' using the prebuilt FVP package,
186        overlaying the dynamically generated runtime YAML  and timeouts.
187        """
188        config_dir = self._get_config_dir()
189        show_output = debug or show_output
190        dynamic_overlay = dynamic_overlay or self.DEFAULT_DYNAMIC_OVERLAY
191
192        time_limit = "40s"
193        if cov_plugin is None:
194            time_limit = "150s" if is_long_running else time_limit
195        else:
196            time_limit = "300s" if is_long_running else "80s"
197
198        # NOTE: Keep this order — timeout first (if enabled), then Shrinkwrap
199        # run with FVP base package and dynamic overlay.
200        # Reordering may break execution or cause config issues.
201        run_cmd = []
202        if not show_output:
203            run_cmd += ["timeout", "--foreground", time_limit]
204
205        run_cmd += [
206            "shrinkwrap", "--runtime", "null", "run", "FVP_Base_RevC-2xAEMvA-hafnium.yaml"
207        ]
208        if debug:
209            run_cmd += ["--overlay", os.path.join(config_dir, "fvp_hf_debug.yaml")]
210        if cov_plugin is not None:
211            run_cmd += ["--overlay", os.path.join(config_dir, "fvp_hf_cov_plugin.yaml")]
212        if dynamic_overlay:
213            run_cmd += ["--overlay", os.path.join(config_dir, dynamic_overlay)]
214
215        logger.debug("Shrinkwrap RUN CMD:\n%s", " ".join(run_cmd))
216
217        self.log_resolved_fvp_command(run_cmd, run_state.log_path)
218
219        try:
220            execute_logged_fn(run_state, run_cmd, env=self.env)
221            logger.debug("\u2705 Shrinkwrap run successful")
222        except subprocess.CalledProcessError as e:
223            raise RuntimeError("\u274C Shrinkwrap run failed") from e
224
225    def log_resolved_fvp_command(self, run_cmd, log_path):
226        """
227        Extracts the actual FVP execution command line from Shrinkwrap's --dry-run
228        output and appends it to the test's UART log for traceability.
229
230        Note:
231        Shrinkwrap's `--dry-run` output includes more than just the FVP command-
232        it may contain shell boilerplate, git operations etc.
233        This function filters that output to extract only the actual FVP model
234        command for logging.
235        """
236        dry_run_cmd = run_cmd + ["--dry-run"]
237
238        try:
239            result = subprocess.run(dry_run_cmd, env=self.env, check=True,
240                                stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
241            output_lines = result.stdout.decode().splitlines()
242            fvp_cmd = []
243            capture_fvp_cmd = False
244
245            for line in output_lines:
246                trimmed_line = line.strip()
247
248                # Regex Logic:
249                # ^/ : Line start with a ( / )
250                #.*/FVP_   : Somewhere in the path, it must include 'FVP_' (e.g., FVP_Base_RevC...)
251                # .*  : can have additionl data after that FVP name
252                if re.match(r"^/.*/FVP_.*", trimmed_line):
253                    capture_fvp_cmd = True  # Start capturing
254
255                if capture_fvp_cmd:
256                    # Stop capturing if we hit an unrelated command or blank (optional)
257                    if (trimmed_line.startswith("#") or
258                        trimmed_line.startswith("git ") or
259                        trimmed_line == ""):
260                        break
261                    fvp_cmd.append(line)
262
263            if fvp_cmd:
264                with open(log_path, "a") as f:
265                    f.write("\n # SHRINKWRAP Resolved FVP Command:\n")
266                    for line in fvp_cmd:
267                        f.write(line + "\n")
268                    f.write("\n")
269            else:
270                logger.warning("No FVP command found in Shrinkwrap --dry-run output.")
271
272        except subprocess.CalledProcessError as e:
273            logger.warning("Shrinkwrap dry-run failed while logging FVP command: %s", e)
274