1#!/usr/bin/env python3
2#
3# Copyright (C) 2022 Intel Corporation.
4#
5# SPDX-License-Identifier: BSD-3-Clause
6#
7
8class PipelineObject:
9    def __init__(self, **kwargs):
10        self.data = {}
11        for k,v in kwargs.items():
12            self.set(k, v)
13
14    def set(self, tag, data):
15        self.data[tag] = data
16
17    def get(self, tag):
18        return self.data[tag]
19
20    def has(self, tag):
21        return tag in self.data.keys()
22
23    def consume(self, tag):
24        return self.data.pop(tag, None)
25
26    def dump(self):
27        print(self.data)
28
29class PipelineStage:
30    # The following three class variables defines the inputs and outputs of the stage. Each of them can be either a set
31    # or a string (which is interpreted as a unit set)
32
33    consumes = set()        # Data consumed by this stage. Consumed data will be unavailable to later stages.
34    uses = set()            # Data used but not consumed by this stage.
35    provides = set()        # Data provided by this stage.
36
37    def run(self, obj):
38        raise NotImplementedError
39
40class PipelineEngine:
41    def __init__(self, initial_data = []):
42        self.stages = []
43        self.initial_data = set(initial_data)
44        self.available_data = set(initial_data)
45
46    def add_stage(self, stage):
47        consumes = stage.consumes if isinstance(stage.consumes, set) else {stage.consumes}
48        uses = stage.uses if isinstance(stage.uses, set) else {stage.uses}
49        provides = stage.provides if isinstance(stage.provides, set) else {stage.provides}
50
51        all_uses = consumes.union(uses)
52        if not all_uses.issubset(self.available_data):
53            raise Exception(f"Data {all_uses - self.available_data} need by stage {stage.__class__.__name__} but not provided by the pipeline")
54
55        self.stages.append(stage)
56        self.available_data = self.available_data.difference(consumes).union(provides)
57
58    def add_stages(self, stages):
59        for stage in stages:
60            self.add_stage(stage)
61
62    def run(self, obj):
63        for tag in self.initial_data:
64            if not obj.has(tag):
65                raise AttributeError(f"Data {tag} is needed by the pipeline but not provided by the object")
66
67        for stage in self.stages:
68            stage.run(obj)
69
70            consumes = stage.consumes if isinstance(stage.consumes, set) else {stage.consumes}
71            for tag in consumes:
72                obj.consume(tag)
73