Trying to construct modelspace

This commit is contained in:
2025-09-15 01:17:15 +07:00
parent 9651175e9a
commit 300d255008
10 changed files with 591 additions and 16 deletions

19
main.py
View File

@@ -1,3 +1,20 @@
from modelspace.ModelPackageSelector import ModelPackageSelector
from modelspace.ModelSpace import ModelSpace
from modelspace.Repository import global_repo
from shell.Interactive import Interactive
Interactive().start()
# Interactive().start()
m = ModelSpace(global_repo, '/mnt/vaiola/gagaga')
d = m._deptree('flux')
d = m._deptree('flux')
d = m._deptree('flux')
d = m._deptree('flux')
d = m._deptree('flux')
d = m._deptree('flux')
d = m._deptree('flux')
d = m._deptree('flux')
# m.run('flux')
# m.install('flux-kontefghjvnxt')

52
modelspace/Essentials.py Normal file
View File

@@ -0,0 +1,52 @@
class ListsDict:
def __init__(self):
self._data: dict[str, list] = dict()
def add(self, key, value):
if key not in self._data: self._data[key] = list()
if value not in self._data[key]: self._data[key].append(value)
def delete(self, key, value):
if self._data.get(key, None): self._data[key].remove(value)
@property
def index(self):
res = list()
for key, collection in self._data.items():
for elem in collection:
if elem not in res:
res.append(elem)
return res
def by_key(self, key):
return self._data.get(key, None)
class SetsDict:
def __init__(self):
self._data: dict[str, set] = dict()
def add(self, key, value):
if key not in self._data: self._data[key] = set()
if value not in self._data[key]: self._data[key].add(value)
def delete(self, key, value):
if self._data.get(key, None): self._data[key].remove(value)
@property
def index(self):
res = set()
for key, collection in self._data.items():
for elem in collection:
if elem not in res:
res.add(elem)
return res
def by_key(self, key):
return self._data.get(key, None)
@property
def keys(self): return self._data.keys()

View File

@@ -1,4 +1,4 @@
import os.path
import os
import shutil
import uuid
from dataclasses import dataclass, fields
@@ -32,7 +32,7 @@ class PackageInfo(Config):
class ModelPackage:
def __init__(self, package_path: str, file_paths: List[str] = None, package_info: PackageInfo = None):
def __init__(self, package_path: str, file_paths: List[str] = None, package_info: PackageInfo = None, load=False):
self.path = Path(package_path)
# Создаем директорию если она не существует
@@ -44,6 +44,12 @@ class ModelPackage:
# Загружаем существующую информацию из файла
self.info = PackageInfo(filename=str(self.package_file))
self.files = []
if load:
self.files = self.get_files(package_path)
return
# Если package_info передан и не пустой, обновляем информацию
if package_info is not None:
# Обновляем только те поля, которые не определены
@@ -66,6 +72,7 @@ class ModelPackage:
self.files_path = self.path / "files"
self.files_path.mkdir(exist_ok=True)
if file_paths:
# Перемещаем файлы в директорию files
for file_path in file_paths:
@@ -77,6 +84,9 @@ class ModelPackage:
else:
shutil.copy2(src, dst)
def __str__(self): return self.name
def __repr__(self): return self.name
@classmethod
def interactive(cls, package_path: str, pkg_uuid = None):
"""Интерактивное создание пакета через консоль"""
@@ -149,6 +159,42 @@ class ModelPackage:
"""Возвращает название пакета"""
return self.info.name
@property
def description(self) -> str:
"""Возвращает название пакета"""
return self.info.description
@property
def release_date(self) -> str:
"""Возвращает название пакета"""
return self.info.release_date
@property
def package_type(self) -> str:
"""Возвращает название пакета"""
return self.info.package_type
@property
def lineage(self) -> str:
"""Возвращает название пакета"""
return self.info.lineage
@property
def size_bytes(self) -> int:
"""Возвращает название пакета"""
return self.info.size_bytes
@property
def version(self) -> str:
"""Возвращает название пакета"""
return self.info.version
@property
def quantization(self) -> str:
"""Возвращает название пакета"""
return self.info.quantization
@property
def dependencies(self) -> List[str]:
"""Возвращает список зависимостей пакета"""
@@ -162,6 +208,47 @@ class ModelPackage:
provides_list.append(self.info.name)
return provides_list
@classmethod
def load(cls, package_path: str): return cls(package_path, load=True)
@staticmethod
def get_files(package_path):
"""
Получает путь к директории, ищет в ней поддиректорию 'files',
и возвращает список относительных путей ко всем файлам внутри 'files'.
:param package_path: Путь к базовой директории
:return: Список относительных путей внутри директории 'files'
"""
files_dir = os.path.join(package_path, 'files')
# Проверяем, существует ли директория 'files'
if not os.path.exists(files_dir):
return []
if not os.path.isdir(files_dir):
return []
relative_paths = []
# Рекурсивно обходим директорию 'files'
for root, dirs, files in os.walk(files_dir):
# Вычисляем относительный путь от 'files' к текущей директории
rel_root = os.path.relpath(root, files_dir)
# Добавляем файлы
for file in files:
if rel_root == '.':
relative_paths.append(file)
else:
relative_paths.append(os.path.join(rel_root, file))
return relative_paths
if __name__ == "__main__":
p = ModelPackage('/tmp/pkg')
pass

View File

@@ -0,0 +1,154 @@
from modelspace.Essentials import SetsDict
from modelspace.ModelPackage import ModelPackage
from modelspace.ModelPackageSubRepository import ModelPackageSubRepository
def select_elements(lst, selection_string):
"""
Выбирает элементы из списка согласно строке выбора
Args:
lst: Исходный список
selection_string: Строка вида "1 2 4-6 all"
Returns:
Новый список с выбранными элементами, отсортированными по номерам
"""
selection_string = selection_string.strip()
if not selection_string.strip():
return []
if selection_string == "all":
return lst.copy()
selected_indices = set()
parts = selection_string.split()
for part in parts:
if '-' in part:
# Обработка диапазона
start, end = map(int, part.split('-'))
# Обработка диапазона в любом направлении
if start <= end:
selected_indices.update(range(start, end + 1))
else:
selected_indices.update(range(start, end - 1, -1))
else:
# Обработка отдельного элемента
selected_indices.add(int(part))
# Преобразуем в список и сортируем по номерам
sorted_indices = sorted(selected_indices)
# Выбираем элементы
result = []
for idx in sorted_indices:
if 0 <= idx < len(lst):
result.append(lst[idx])
return result
def format_bytes(bytes_size):
"""Convert bytes to human readable format"""
if bytes_size < 1024:
return f"{bytes_size} B"
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_size < 1024.0:
return f"{bytes_size:.1f} {unit}"
bytes_size /= 1024.0
return f"{bytes_size:.1f} PB"
"""
Инициация: repo - репозиторий, из которого устанавливаются пакеты
Вход: resource - текстовое имя ресурса
"""
class ModelPackageSelector:
empty_str = ''
def __init__(self, repo: ModelPackageSubRepository, installed_packages: set[ModelPackage]):
self.repo: ModelPackageSubRepository = repo
self._pending_files = SetsDict()
self._installed_packages: set[ModelPackage] = installed_packages
# Completed
def run(self, resource: str, answer = None) -> set[ModelPackage]:
available_packages = self.repo.packages_by_resource(resource)
if len(available_packages) == 0: raise RuntimeError
installed_packages = available_packages & self._installed_packages
available_packages = available_packages - installed_packages
if len(available_packages) == 0:
self._installed_packages = self._installed_packages | set(available_packages)
return available_packages
if len(available_packages) == 1:
self._installed_packages = self._installed_packages | set(available_packages)
return available_packages
# running interactive selection
return self._interactive(resource, list(available_packages), list(installed_packages), answer)
def _interactive(self, resource: str, available, installed, answer = None) -> set[ModelPackage]:
print(f'There are several packages, satisfies resource {resource}. Please select one or more. Syntax: 1 2 4-5 all')
print_data = {'Installed packages: ' : installed, 'Available packages: ' : available, }
for prompt, pr_packages in print_data.items():
if len(pr_packages) == 0: continue
print(prompt)
cols = [['N'], ['name'], ['ver'], ['quant'], ['size']]
descs = []
for i in range(len(pr_packages)):
p: ModelPackage = pr_packages[i]
cols[0].append(i)
cols[1].append(p.name)
cols[2].append(p.version)
cols[3].append(p.quantization)
cols[4].append(format_bytes(p.size_bytes))
descs.append(p.description)
col_widths = [0, 0, 0, 0, 0]
for i in range(len(cols)):
for elem in cols[i]:
length = len(str(elem))
if length > col_widths[i] - 1: col_widths[i] = length + 1
for i in range(len(cols[0])):
print(' ' +
f'{cols[0][i]:<{col_widths[0]}}' +
f'{cols[1][i]:<{col_widths[1]}}' +
f'{cols[2][i]:<{col_widths[2]}}' +
f'{cols[3][i]:<{col_widths[3]}}' +
f'{cols[4][i]:<{col_widths[4]}}'
)
if i != 0: print(' ' + f'{self.empty_str:<{col_widths[0]}}' + f'{descs[i - 1]}')
while True:
if answer:
inp = answer
answer = None
else:
inp = input("Your choice: ")
if inp == '': continue
try:
selected_packages: list[ModelPackage] = select_elements(available, selection_string=inp)
except Exception:
continue
if selected_packages and len(selected_packages) > 0:
self._installed_packages = self._installed_packages | set(selected_packages)
return set(selected_packages)
return set()

View File

@@ -0,0 +1,95 @@
import uuid
from pathlib import Path
from modelspace.Essentials import SetsDict
from modelspace.ModelPackage import ModelPackage
class ModelPackageSubRepository:
def __init__(self, path, seed):
path.mkdir(exist_ok=True)
self.path = Path(path)
self.seed = seed
self.packages: dict[str, ModelPackage] | None = None
self.resources: SetsDict | None = None
self.reload()
# Completed
def _reload_packages(self):
self.packages = dict()
try:
dirs = [item.name for item in self.path.iterdir() if item.is_dir()]
except OSError as e:
print(f"Ошибка доступа к директории: {e}")
dirs = []
for d in dirs:
package = ModelPackage.load(str(self.path / d))
self.packages[package.uuid] = package
# Completed
def _reload_resources(self):
self.resources = SetsDict()
for pkg_id, package in self.packages.items():
for resource in package.provides:
self.resources.add(resource, pkg_id)
def reload(self):
self._reload_packages()
self._reload_resources()
# debugged
def resources_from_pkg_list(self, uuids: list[str]):
selected_packages = []
for pkg_id in uuids:
package = self.packages.get(pkg_id, None)
selected_packages.append(package)
resources = SetsDict()
for package in selected_packages:
for resource in package.provides:
resources.add(resource, package.uuid)
return resources
@staticmethod
def deps_from_pkg_list(packages: list[ModelPackage]) -> set[str]:
res = set()
for package in packages: res = res | set(package.dependencies)
return res
# debugged
def packages_by_resource(self, resource):
packages_ids = self.resources.by_key(resource)
if not packages_ids or len(packages_ids) == 0:
raise RuntimeError(f"{resource}: There are no packages in the repository that provide this resource")
else:
packages_ids = list(packages_ids)
packages: set[ModelPackage] = set()
for pkg_id in packages_ids: packages.add(self.package_by_id(pkg_id))
return packages
# debugged
def package_by_id(self, pkg_id):
package = self.packages.get(pkg_id, None)
if not package: raise RuntimeError(f"{pkg_id}: Something went wrong while reading package info")
return package
def add_package_interactive(self) -> ModelPackage:
"""Добавляет новый пакет модели интерактивно"""
# Генерируем новый UUID
package_uuid = str(uuid.uuid4())
# Создаем путь к новому пакету
package_path = self.path / package_uuid
# Вызываем интерактивное создание пакета
package = ModelPackage.interactive(str(package_path), package_uuid)
loaded_package = ModelPackage.load(str(package_path))
self.packages[loaded_package.uuid] = loaded_package
return package

136
modelspace/ModelSpace.py Normal file
View File

@@ -0,0 +1,136 @@
from pathlib import Path
import os
from modelspace.ModelPackage import ModelPackage
from modelspace.ModelPackageSelector import ModelPackageSelector
from modelspace.ModelSpaceDatabase import ModelSpaceDatabase
from modelspace.Repository import Repository
class ModelSpace:
def __init__(self, repo: Repository, path):
self.repo = repo
self.path = path
self.config_dir = Path(self.path) / '.vaiola'
os.makedirs(self.config_dir, exist_ok=True)
self._copier = self._hard_link_copier
self.db = ModelSpaceDatabase(str(self.config_dir))
self.selector = ModelPackageSelector(self.repo.model_sub_repo, set(self.installed_packages))
self.temp_installed_resources = self.installed_resources
def _reset_selector(self):
self.selector = ModelPackageSelector(self.repo.model_sub_repo, set(self.installed_packages))
self.temp_installed_resources = self.installed_resources
def check_for_file_conflicts(self, path: str | list[str]) -> list:
# TODO
raise NotImplemented
# conflicts = []
# if not isinstance(path, list): path = [path]
# for p in path:
# fullpath = Path(self.path) / p
# if fullpath.exists() or p in self.temp['pending_files'].index: conflicts.append(p)
# return conflicts
def get_dest_dir(self, package: ModelPackage):
# TODO
raise NotImplemented
def _register_package_in_temp(self, package: ModelPackage):
for resource in package.provides: self.temp_installed_resources.add(resource, package.uuid)
def _deptree(self, resource: str, answer = None, depth = 0) -> set[ModelPackage]:
selected_packages = set()
packages = self.selector.run(resource, answer)
deps = self.repo.model_sub_repo.deps_from_pkg_list(packages)
deps = deps - set(self.temp_installed_resources.keys)
for package in packages: self._register_package_in_temp(package)
for dep in deps:
selected_packages = selected_packages | self._deptree(dep, answer, depth + 1)
return selected_packages | packages
def install(self, resources: str | list[str], depth = 0, answer = None) -> None:
if depth == 0: self._reset_selector()
if isinstance(resources, str): resources = [resources]
selected_packages = set()
for resource in resources:
selected_packages = selected_packages | self._deptree(resource, answer)
# TODO check file conflicts
# TODO run copier
pass
@staticmethod
def _hard_link_copier(source_dir, dest_dir, file_paths):
"""Создает жесткие ссылки для файлов из источника в директорию назначения
Args:
source_dir (str): Путь к директории-источнику
dest_dir (str): Путь к директории-назначению
file_paths (list): Список относительных путей файлов в директории источника
"""
for file_path in file_paths:
# Формируем полный путь к исходному файлу
source_file = os.path.join(source_dir, file_path)
# Формируем полный путь к целевому файлу
dest_file = os.path.join(dest_dir, file_path)
# Создаем необходимые поддиректории в директории назначения
dest_dir_name = os.path.dirname(dest_file)
if dest_dir_name and not os.path.exists(dest_dir_name):
os.makedirs(dest_dir_name)
# Создаем жесткую ссылку
os.link(source_file, dest_file)
@property
def available_packages(self): return self.repo.model_sub_repo.packages
@property
def available_resources(self): return self.repo.model_sub_repo.resources
@property
def installed_manuals(self): return [m[0] if m and len(m) > 0 else None for m in self.db.get_all_manuals()] if self.db.get_all_manuals() else list()
@property
def installed_deps(self): return [m[0] if m and len(m) > 0 else None for m in self.db.get_all_deps()] if self.db.get_all_deps() else list()
@property
def installed_packages(self): return self.installed_manuals + self.installed_deps
@property
def installed_resources(self): return self.repo.model_sub_repo.resources_from_pkg_list(self.installed_packages)

View File

@@ -0,0 +1,39 @@
import sqlite3 as sq
from pathlib import Path
class ModelSpaceDatabase:
def __init__(self, path):
self.path = path
self.con = sq.connect(Path(self.path) / "modelspace.db")
self.con.autocommit = True
self.cur = self.con.cursor()
self.create(self.cur)
@staticmethod
def create(cur):
cur.execute('CREATE TABLE IF NOT EXISTS files (uuid TEXT NOT NULL, file TEXT NOT NULL, PRIMARY KEY(uuid, file))')
cur.execute('CREATE TABLE IF NOT EXISTS manual (uuid TEXT NOT NULL PRIMARY KEY)')
cur.execute('CREATE TABLE IF NOT EXISTS deps (uuid TEXT NOT NULL PRIMARY KEY)')
def create_file(self, uuid, file): self.cur.execute('INSERT OR IGNORE INTO files (uuid, file) VALUES (?, ?)', (uuid, file))
def delete_file(self, uuid, file): self.cur.execute('DELETE FROM files WHERE uuid = ? AND file = ?', (uuid, file))
def get_all_files(self): self.cur.execute('SELECT * FROM files'); return self.cur.fetchall()
def get_files_by_uuid(self, uuid): self.cur.execute('SELECT file FROM files WHERE uuid = ?', (uuid,)); return self.cur.fetchall()
def get_uuid_by_file(self, file): self.cur.execute('SELECT uuid FROM files WHERE file = ?', (file,)); return self.cur.fetchall()
def create_manual(self, uuid): self.cur.execute('INSERT OR IGNORE INTO manual (uuid) VALUES (?)', (uuid,))
def delete_manual(self, uuid): self.cur.execute('DELETE FROM manual WHERE uuid = ?', (uuid,))
def get_all_manuals(self): self.cur.execute('SELECT * FROM manual'); return self.cur.fetchall()
def create_deps(self, uuid): self.cur.execute('INSERT OR IGNORE INTO deps (uuid) VALUES (?)', (uuid,))
def delete_deps(self, uuid): self.cur.execute('DELETE FROM deps WHERE uuid = ?', (uuid,))
def get_all_deps(self): self.cur.execute('SELECT * FROM deps'); return self.cur.fetchall()
def get_files_count(self): self.cur.execute('SELECT COUNT(*) FROM files'); return self.cur.fetchone()[0]
def get_manuals_count(self): self.cur.execute('SELECT COUNT(*) FROM manual'); return self.cur.fetchone()[0]
def get_deps_count(self): self.cur.execute('SELECT COUNT(*) FROM deps'); return self.cur.fetchone()[0]
def clear_table(self, table_name): self.cur.execute(f'DELETE FROM {table_name}')

View File

@@ -3,6 +3,7 @@ from dataclasses import dataclass
from pathlib import Path
from modelspace.ModelPackage import ModelPackage
from modelspace.ModelPackageSubRepository import ModelPackageSubRepository
from pythonapp.Libs.ConfigDataClass import Config
@@ -31,27 +32,22 @@ class Repository:
self._generate_and_save_seed()
# Создаем поддиректорию model-packages если она не существует
self.model_packages_path = self.path / "model-packages"
self.model_packages_path.mkdir(exist_ok=True)
self.model_sub_repo = ModelPackageSubRepository(self.path / "model-packages", self.seed)
# Completed
def _generate_and_save_seed(self) -> None:
"""Генерирует новый UUID и сохраняет его в конфиг"""
self.config.seed = str(uuid.uuid4())
self.config.save() # Сохраняем сразу после генерации
# Completed
@property
def seed(self) -> str:
"""Возвращает текущий сид"""
return self.config.seed
def add_model_package_interactive(self) -> ModelPackage:
"""Добавляет новый пакет модели интерактивно"""
# Генерируем новый UUID
package_uuid = str(uuid.uuid4())
return self.model_sub_repo.add_package_interactive()
# Создаем путь к новому пакету
package_path = self.model_packages_path / package_uuid
# Вызываем интерактивное создание пакета
package = ModelPackage.interactive(str(package_path), package_uuid)
return package
global_repo = Repository(str(Path('..') / 'repo'))

View File

@@ -1,7 +1,5 @@
from pathlib import Path
from modelspace.Repository import Repository
global_repo = Repository(str(Path('..') / 'repo'))
class ExecutionError(RuntimeError): pass

View File

@@ -1,4 +1,5 @@
from shell.Handlers.ABS import Handler, global_repo
from shell.Handlers.ABS import Handler
from modelspace.Repository import global_repo
class ModelSpaceHandler(Handler):