1#!/usr/bin/env python3 2# 3# Copyright (C) 2022 Intel Corporation. 4# 5# SPDX-License-Identifier: BSD-3-Clause 6# 7 8class PipelineObject: 9 def __init__(self, **kwargs): 10 self.data = {} 11 for k,v in kwargs.items(): 12 self.set(k, v) 13 14 def set(self, tag, data): 15 self.data[tag] = data 16 17 def get(self, tag): 18 return self.data[tag] 19 20 def has(self, tag): 21 return tag in self.data.keys() 22 23 def consume(self, tag): 24 return self.data.pop(tag, None) 25 26 def dump(self): 27 print(self.data) 28 29class PipelineStage: 30 # The following three class variables defines the inputs and outputs of the stage. Each of them can be either a set 31 # or a string (which is interpreted as a unit set) 32 33 consumes = set() # Data consumed by this stage. Consumed data will be unavailable to later stages. 34 uses = set() # Data used but not consumed by this stage. 35 provides = set() # Data provided by this stage. 36 37 def run(self, obj): 38 raise NotImplementedError 39 40class PipelineEngine: 41 def __init__(self, initial_data = []): 42 self.stages = [] 43 self.initial_data = set(initial_data) 44 self.available_data = set(initial_data) 45 46 def add_stage(self, stage): 47 consumes = stage.consumes if isinstance(stage.consumes, set) else {stage.consumes} 48 uses = stage.uses if isinstance(stage.uses, set) else {stage.uses} 49 provides = stage.provides if isinstance(stage.provides, set) else {stage.provides} 50 51 all_uses = consumes.union(uses) 52 if not all_uses.issubset(self.available_data): 53 raise Exception(f"Data {all_uses - self.available_data} need by stage {stage.__class__.__name__} but not provided by the pipeline") 54 55 self.stages.append(stage) 56 self.available_data = self.available_data.difference(consumes).union(provides) 57 58 def add_stages(self, stages): 59 for stage in stages: 60 self.add_stage(stage) 61 62 def run(self, obj): 63 for tag in self.initial_data: 64 if not obj.has(tag): 65 raise AttributeError(f"Data {tag} is needed by the pipeline but not provided by the object") 66 67 for stage in self.stages: 68 stage.run(obj) 69 70 consumes = stage.consumes if isinstance(stage.consumes, set) else {stage.consumes} 71 for tag in consumes: 72 obj.consume(tag) 73