1#!/usr/bin/env python3
2"""
3misc/config_tools/configurator/packages/configurator/thirdLib/manager.py
4depend on misc/config_tools/configurator/packages/configurator/thirdLib/library.json
5"""
6import argparse
7import os
8import json
9import shutil
10import tarfile
11from pathlib import Path
12
13import requests
14from tqdm.auto import tqdm
15
16
17def check(library_info, operation):
18    if operation == 'check':
19        print('Check:')
20    for library in library_info['library']:
21        library_name = library["name"]
22
23        check_type = library['check']['type']
24        if check_type == 'file':
25            check_file = library['check']['path']
26            check_result = os.path.isfile(check_file)
27
28            if operation == 'check':
29                print(f'{library["name"]}: {check_result}')
30            elif operation == 'install' and not check_result:
31                library_install = library["install"]
32                install(library_name, library_install)
33            elif operation == 'clean' and check_result:
34                clean(library_name, library['clean'])
35
36
37def install(library_name, library_install):
38    print(f'Install: {library_name}')
39    for step in library_install:
40        step_type = step['type']
41        if step_type == "copy":
42            copy_from = step['from']
43            copy_to = step['to']
44            shutil.copyfile(copy_from, copy_to)
45            print(f"copy: {copy_to} success")
46        elif step_type == "download":
47            download_from = step['from']
48            download_to = step['to']
49
50            with requests.get(download_from, stream=True) as r:
51                total_length = int(r.headers.get("Content-Length"))
52                with tqdm.wrapattr(r.raw, "read", total=total_length, desc="") as raw:
53                    with open(download_to, 'wb') as output:
54                        shutil.copyfileobj(raw, output)
55            print(f'download: {download_to} success')
56        elif step_type == "extract":
57            tar_file: str = step['from']
58            tar_mode = tar_file.split('.')[-1]
59            assert tar_mode in ['gz', 'bz2', 'xz', 'tar']
60            print(f'extract: {tar_file}')
61            with tarfile.open(tar_file, 'r') as tar:
62                def is_within_directory(directory, target):
63
64                    abs_directory = os.path.abspath(directory)
65                    abs_target = os.path.abspath(target)
66
67                    prefix = os.path.commonprefix([abs_directory, abs_target])
68
69                    return prefix == abs_directory
70
71                def safe_extract(tar, path=".", members=None, *, numeric_owner=False):
72
73                    for member in tar.getmembers():
74                        member_path = os.path.join(path, member.name)
75                        if not is_within_directory(path, member_path):
76                            raise Exception("Attempted Path Traversal in Tar File")
77
78                    tar.extractall(path, members, numeric_owner=numeric_owner)
79
80
81                safe_extract(tar, path=step["to"], members=tar)
82            print(f'extract: {tar_file} success')
83        elif step_type == 'remove':
84            remove_path = step['path']
85            if os.path.isfile(remove_path):
86                os.remove(remove_path)
87            else:
88                shutil.rmtree(remove_path)
89            print(f'remove: {remove_path} success')
90        elif step_type == "replaceText":
91            filename = step['file']
92            file_content = open(filename, encoding='utf-8').read()
93            replace_old = step['old']
94            replace_new = step['new']
95            file_content = file_content.replace(replace_old, replace_new)
96            open(filename, 'w', encoding='utf-8').write(file_content)
97            print(f"replaceText: {filename} success")
98        else:
99            print(step)
100            raise ValueError
101    print(f'Install: {library_name} success')
102
103
104def clean(library_name, library_clean):
105    print(f'Clean: {library_name}')
106    for clean_path in library_clean:
107        if os.path.isfile(clean_path):
108            os.remove(clean_path)
109        elif os.path.isdir(clean_path):
110            shutil.rmtree(clean_path)
111        print(f'remove: {clean_path} success')
112    print(f'Clean: {library_name} success')
113
114
115def manager(operation, library_info):
116    cwd = os.path.abspath(os.getcwd())
117
118    third_dir = os.path.dirname(
119        os.path.abspath(__file__)
120    )
121    os.chdir(third_dir)
122
123    if operation in ['check', 'install', 'clean']:
124        check(library_info, operation)
125    os.chdir(cwd)
126
127
128def main():
129    library_json = Path(__file__).parent / 'library.json'
130
131    parser = argparse.ArgumentParser(
132        description='ACRN Configurator third part library manager.'
133    )
134    parser.add_argument('operation', choices=['check', 'install', 'clean'])
135    parser.add_argument('-c', '--config', dest='config', default=library_json)
136
137    args = parser.parse_args()
138    library_info = json.load(open(args.config, encoding='utf-8'))
139
140    manager(args.operation, library_info)
141
142
143if __name__ == '__main__':
144    main()
145