95 lines
3.1 KiB
Python
95 lines
3.1 KiB
Python
import uuid
|
|
from pathlib import Path
|
|
|
|
from modelspace.Essentials import SetsDict
|
|
from modelspace.ModelPackage import ModelPackage
|
|
|
|
|
|
class ModelPackageSubRepository:
|
|
def __init__(self, path, seed):
|
|
path.mkdir(exist_ok=True)
|
|
self.path = Path(path)
|
|
self.seed = seed
|
|
self.packages: dict[str, ModelPackage] | None = None
|
|
self.resources: SetsDict | None = None
|
|
self.reload()
|
|
|
|
# Completed
|
|
def _reload_packages(self):
|
|
self.packages = dict()
|
|
try:
|
|
dirs = [item.name for item in self.path.iterdir() if item.is_dir()]
|
|
except OSError as e:
|
|
print(f"Ошибка доступа к директории: {e}")
|
|
dirs = []
|
|
|
|
for d in dirs:
|
|
package = ModelPackage.load(str(self.path / d))
|
|
self.packages[package.uuid] = package
|
|
|
|
# Completed
|
|
def _reload_resources(self):
|
|
self.resources = SetsDict()
|
|
|
|
for pkg_id, package in self.packages.items():
|
|
for resource in package.provides:
|
|
self.resources.add(resource, pkg_id)
|
|
|
|
def reload(self):
|
|
self._reload_packages()
|
|
self._reload_resources()
|
|
|
|
|
|
# debugged
|
|
def resources_from_pkg_list(self, uuids: list[str]):
|
|
selected_packages = []
|
|
for pkg_id in uuids:
|
|
package = self.packages.get(pkg_id, None)
|
|
selected_packages.append(package)
|
|
|
|
resources = SetsDict()
|
|
for package in selected_packages:
|
|
for resource in package.provides:
|
|
resources.add(resource, package.uuid)
|
|
|
|
return resources
|
|
|
|
@staticmethod
|
|
def deps_from_pkg_list(packages: list[ModelPackage]) -> set[str]:
|
|
res = set()
|
|
for package in packages: res = res | set(package.dependencies)
|
|
return res
|
|
|
|
# debugged
|
|
def packages_by_resource(self, resource):
|
|
packages_ids = self.resources.by_key(resource)
|
|
|
|
if not packages_ids or len(packages_ids) == 0:
|
|
raise RuntimeError(f"{resource}: There are no packages in the repository that provide this resource")
|
|
else:
|
|
packages_ids = list(packages_ids)
|
|
|
|
packages: set[ModelPackage] = set()
|
|
for pkg_id in packages_ids: packages.add(self.package_by_id(pkg_id))
|
|
return packages
|
|
|
|
# debugged
|
|
def package_by_id(self, pkg_id):
|
|
package = self.packages.get(pkg_id, None)
|
|
if not package: raise RuntimeError(f"{pkg_id}: Something went wrong while reading package info")
|
|
return package
|
|
|
|
|
|
def add_package_interactive(self) -> ModelPackage:
|
|
"""Добавляет новый пакет модели интерактивно"""
|
|
# Генерируем новый UUID
|
|
package_uuid = str(uuid.uuid4())
|
|
|
|
# Создаем путь к новому пакету
|
|
package_path = self.path / package_uuid
|
|
|
|
# Вызываем интерактивное создание пакета
|
|
package = ModelPackage.interactive(str(package_path), package_uuid)
|
|
loaded_package = ModelPackage.load(str(package_path))
|
|
self.packages[loaded_package.uuid] = loaded_package
|
|
return package |