155 lines
5.4 KiB
Python
155 lines
5.4 KiB
Python
from modelspace.Essentials import SetsDict
|
||
from modelspace.ModelPackage import ModelPackage
|
||
from modelspace.ModelPackageSubRepository import ModelPackageSubRepository
|
||
|
||
|
||
def select_elements(lst, selection_string):
|
||
"""
|
||
Выбирает элементы из списка согласно строке выбора
|
||
|
||
Args:
|
||
lst: Исходный список
|
||
selection_string: Строка вида "1 2 4-6 all"
|
||
|
||
Returns:
|
||
Новый список с выбранными элементами, отсортированными по номерам
|
||
"""
|
||
selection_string = selection_string.strip()
|
||
if not selection_string.strip():
|
||
return []
|
||
|
||
if selection_string == "all":
|
||
return lst.copy()
|
||
|
||
selected_indices = set()
|
||
parts = selection_string.split()
|
||
|
||
for part in parts:
|
||
if '-' in part:
|
||
# Обработка диапазона
|
||
start, end = map(int, part.split('-'))
|
||
# Обработка диапазона в любом направлении
|
||
if start <= end:
|
||
selected_indices.update(range(start, end + 1))
|
||
else:
|
||
selected_indices.update(range(start, end - 1, -1))
|
||
else:
|
||
# Обработка отдельного элемента
|
||
selected_indices.add(int(part))
|
||
|
||
# Преобразуем в список и сортируем по номерам
|
||
sorted_indices = sorted(selected_indices)
|
||
|
||
# Выбираем элементы
|
||
result = []
|
||
for idx in sorted_indices:
|
||
if 0 <= idx < len(lst):
|
||
result.append(lst[idx])
|
||
|
||
return result
|
||
|
||
def format_bytes(bytes_size):
|
||
"""Convert bytes to human readable format"""
|
||
if bytes_size < 1024:
|
||
return f"{bytes_size} B"
|
||
|
||
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
|
||
if bytes_size < 1024.0:
|
||
return f"{bytes_size:.1f} {unit}"
|
||
bytes_size /= 1024.0
|
||
|
||
return f"{bytes_size:.1f} PB"
|
||
|
||
|
||
"""
|
||
Инициация: repo - репозиторий, из которого устанавливаются пакеты
|
||
Вход: resource - текстовое имя ресурса
|
||
|
||
|
||
|
||
|
||
|
||
"""
|
||
|
||
|
||
class ModelPackageSelector:
|
||
empty_str = ''
|
||
|
||
def __init__(self, repo: ModelPackageSubRepository, installed_packages: set[ModelPackage]):
|
||
self.repo: ModelPackageSubRepository = repo
|
||
self._pending_files = SetsDict()
|
||
self._installed_packages: set[ModelPackage] = installed_packages
|
||
|
||
# Completed
|
||
def run(self, resource: str, answer = None) -> set[ModelPackage]:
|
||
available_packages = self.repo.packages_by_resource(resource)
|
||
if len(available_packages) == 0: raise RuntimeError
|
||
installed_packages = available_packages & self._installed_packages
|
||
available_packages = available_packages - installed_packages
|
||
|
||
if len(available_packages) == 0:
|
||
print("All possible packages for your query had been installed")
|
||
return available_packages
|
||
if len(available_packages) == 1:
|
||
self._installed_packages = self._installed_packages | set(available_packages)
|
||
return available_packages
|
||
|
||
# running interactive selection
|
||
return self._interactive(resource, list(available_packages), list(installed_packages), answer)
|
||
|
||
|
||
def _interactive(self, resource: str, available, installed, answer = None) -> set[ModelPackage]:
|
||
print(f'There are several packages, satisfies resource {resource}. Please select one or more. Syntax: 1 2 4-5 all')
|
||
|
||
|
||
print_data = {'Installed packages: ' : installed, 'Available packages: ' : available, }
|
||
for prompt, pr_packages in print_data.items():
|
||
if len(pr_packages) == 0: continue
|
||
print(prompt)
|
||
cols = [['N'], ['name'], ['ver'], ['quant'], ['size']]
|
||
descs = []
|
||
|
||
for i in range(len(pr_packages)):
|
||
p: ModelPackage = pr_packages[i]
|
||
cols[0].append(i)
|
||
cols[1].append(p.name)
|
||
cols[2].append(p.version)
|
||
cols[3].append(p.quantization)
|
||
cols[4].append(format_bytes(p.size_bytes))
|
||
descs.append(p.description)
|
||
|
||
col_widths = [0, 0, 0, 0, 0]
|
||
for i in range(len(cols)):
|
||
for elem in cols[i]:
|
||
length = len(str(elem))
|
||
if length > col_widths[i] - 1: col_widths[i] = length + 1
|
||
|
||
for i in range(len(cols[0])):
|
||
print(' ' +
|
||
f'{cols[0][i]:<{col_widths[0]}}' +
|
||
f'{cols[1][i]:<{col_widths[1]}}' +
|
||
f'{cols[2][i]:<{col_widths[2]}}' +
|
||
f'{cols[3][i]:<{col_widths[3]}}' +
|
||
f'{cols[4][i]:<{col_widths[4]}}'
|
||
)
|
||
if i != 0: print(' ' + f'{self.empty_str:<{col_widths[0]}}' + f'{descs[i - 1]}')
|
||
|
||
while True:
|
||
if answer:
|
||
inp = answer
|
||
answer = None
|
||
else:
|
||
inp = input("Your choice: ")
|
||
|
||
if inp == '': continue
|
||
try:
|
||
selected_packages: list[ModelPackage] = select_elements(available, selection_string=inp)
|
||
except Exception:
|
||
continue
|
||
if selected_packages and len(selected_packages) > 0:
|
||
self._installed_packages = self._installed_packages | set(selected_packages)
|
||
return set(selected_packages)
|
||
|
||
return set()
|
||
|