Clean garbage

This commit is contained in:
2025-09-16 20:05:55 +07:00
parent 98aa64515f
commit 1a992557eb
13 changed files with 20 additions and 1116 deletions

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
__pycache__/
repo/
deprecated

View File

@@ -1,56 +0,0 @@
from sqlite3 import connect
from shell.Handlers.InstanceHandler import InstanceHandler
class Handler:
def __init__(self):
self.exec_succeed = True
self.global_commands = GlobalCommandsHandler()
self.instance_commands = InstanceHandler()
pass
def handle(self, command: list[str]):
if len(command) == 0: return
verb = command[0].upper()
if verb == 'CHECK':
if not self.exec_succeed: raise RuntimeWarning("Last command execution failed")
else: return
else: self.exec_succeed = False
if verb == 'PYTHONAPP': self.instance_commands.handle(command[1:])
else: self.global_commands.handle(command)
self.exec_succeed = True
class GlobalCommandsHandler:
def __init__(self):
pass
def _unknown_command(self, command: list[str]):
command_str = ''
for word in command: command_str += word + ' '
raise ValueError(f"Unknown command: {command_str}")
def handle(self, command: list[str]):
if len(command) == 0: return
verb = command[0].upper()
if verb == "TELL":
command_str = ''
for word in command[1:]: command_str += word + ' '
print(command_str)
elif verb == "EXIT": raise KeyboardInterrupt
else:
self._unknown_command(command)
return

View File

@@ -1,88 +0,0 @@
from pythonapp.Instance.Instance import Instance
class InstanceHandler:
def __init__(self):
self._loaded_instances: dict[str, Instance] = {}
self._active_instance: Instance | None = None
pass
def _unknown_command(self, command: list[str]):
command_str = ''
for word in command: command_str += word + ' '
raise ValueError(f"Unknown INSTANCE subcommand: {command_str}")
def _create_handler(self, command: list[str]):
if len(command) < 4: raise ValueError("CREATE INSTANCE: missing arguments")
if command[1].upper() == "VENV":
Instance(path=command[3], env=command[1].lower(), python=command[2]).create()
def _activate_handler(self, command: list[str]):
if len(command) != 2: raise ValueError("ACTIVATE INSTANCE: missing arguments")
i = self._loaded_instances.get(command[1], None)
if i: self._active_instance = i
else: raise ValueError("ACTIVATE INSTANCE: instance not exists")
def _load_handler(self, command: list[str]):
if len(command) != 3: raise ValueError("LOAD INSTANCE: missing arguments")
name = command[1]
path = command[2]
i = Instance(path)
if not i.config.created: raise ValueError("ACTIVATE INSTANCE: instance not exists")
self._loaded_instances[name] = i
self._active_instance = i
print(f"instance {path} loaded and activated. identified by {name}")
def _show_handler(self, command: list[str]):
print("Environment type:", self._active_instance.config.env_type)
def _install_handler(self, command: list[str]):
if len(command) <= 2: raise ValueError("missing arguments")
verb = command[1].lower()
if verb == "package":
args = command[2:]
pkgs = []
repo = None
for arg in args:
if arg.lower().startswith("repo:"): repo = arg[5:]
else: pkgs.append(arg)
self._active_instance.install_packages(pkgs, repo)
if verb == "gitapp":
app_extensions_dir: str = None
app_models_dir: str = None
app_output_dir: str = None
app_input_dir: str = None
app_user_dir: str = None
req_file: str = 'requirements.txt'
url = command[2]
args = command[3:]
for arg in args:
if arg.lower().startswith("ext:"): app_extensions_dir = arg[4:]
elif arg.lower().startswith("mod:"): app_models_dir = arg[4:]
elif arg.lower().startswith("out:"): app_output_dir = arg[4:]
elif arg.lower().startswith("inp:"): app_input_dir = arg[4:]
elif arg.lower().startswith("usr:"): app_user_dir = arg[4:]
elif arg.lower().startswith("req:"): req_file = arg[4:]
self._active_instance.install_git_app(url, req_file, app_extensions_dir, app_models_dir, app_output_dir,
app_input_dir, app_user_dir)
def handle(self, command: list[str]):
if len(command) == 0: return
verb = command[0].upper()
if verb == "CREATE": self._create_handler(command)
elif verb == "LOAD": self._load_handler(command)
elif verb == "ACTIVATE": self._activate_handler(command)
elif verb == "SHOW": self._show_handler(command)
elif verb == "INSTALL": self._install_handler(command)
else:
self._unknown_command(command)
return
pass

View File

@@ -1,318 +0,0 @@
import subprocess
import re
from dataclasses import dataclass, field
from typing import List, Dict, Set, Optional
from enum import Enum
import logging
from pythonapp.Decider.Loader import RequirementInfo
# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ConflictGrubber:
"""Класс для разрешения конфликтов зависимостей."""
conflict_errors: List[ConflictError] = field(default_factory=list)
max_attempts: int = 10
def check_for_conflicts(self,
env,
installed_reqs: List[RequirementInfo],
requested_reqs: List[RequirementInfo],
repo_url: str) -> List[ConflictError]:
"""
Проверяет конфликты зависимостей между установленными и запрашиваемыми пакетами.
Args:
env: Виртуальное окружение
installed_reqs: Список установленных пакетов
requested_reqs: Список запрашиваемых пакетов
repo_url: URL репозитория пакетов
Returns:
Список ошибок конфликтов
"""
self.conflict_errors.clear()
current_requested = requested_reqs.copy()
attempt = 0
# Создаем словари для быстрого доступа
installed_dict = {req.package_name: req for req in installed_reqs}
requested_dict = {req.package_name: req for req in current_requested}
while attempt < self.max_attempts:
attempt += 1
logger.info(f"Попытка разрешения конфликтов #{attempt}")
# Формируем команду для dry-run
all_requirements = [req.requirement_str for req in installed_reqs + current_requested]
# Выполняем dry-run установки
result = self._run_dry_install(env, all_requirements, repo_url)
# Анализируем результат
errors = self._parse_pip_output(result.stderr, result.stdout)
if not errors:
logger.info("Конфликты не обнаружены")
break
# Обрабатываем ошибки
if not self._process_errors(errors, installed_dict, requested_dict, current_requested):
logger.warning("Обнаружены неустранимые конфликты")
break
return self.conflict_errors
def _run_dry_install(self, env, requirements: List[str], repo_url: str) -> subprocess.CompletedProcess:
"""Выполняет dry-run установки пакетов."""
command = [env.pip_path, "install", "--dry-run", "--no-cache-dir"]
command.extend(requirements)
if repo_url:
command.extend(["--extra-index-url", repo_url])
logger.debug(f"Выполнение команды: {' '.join(command)}")
return subprocess.run(command, capture_output=True, text=True, timeout=300)
def _parse_pip_output(self, stderr: str, stdout: str) -> List[Dict]:
"""Анализирует вывод pip и извлекает информацию о конфликтах."""
errors = []
output = stderr + stdout
# Ищем конфликты версий
version_conflicts = re.findall(
r'ERROR: Cannot install ([^\n]+) because these package versions have conflicting dependencies',
output
)
for conflict in version_conflicts:
packages = re.findall(r'([a-zA-Z0-9_\-\.]+)==([a-zA-Z0-9_\-\.]+)', conflict)
if packages:
error_info = {
'type': 'version_conflict',
'packages': {pkg: ver for pkg, ver in packages},
'message': f"Конфликт версий: {conflict}"
}
errors.append(error_info)
# Ищем несовместимые требования
requirement_conflicts = re.findall(
r'ERROR: ([a-zA-Z0-9_\-\.]+) ([^\n]+) has requirement ([^\n]+), but you\'ll have ([^\n]+) which is incompatible',
output
)
for pkg, version, required, actual in requirement_conflicts:
error_info = {
'type': 'requirement_conflict',
'package': pkg.strip(),
'required': required.strip(),
'actual': actual.strip(),
'message': f"{pkg} {version} требует {required}, но установлена {actual}"
}
errors.append(error_info)
return errors
def _process_errors(self, errors: List[Dict],
installed_dict: Dict[str, RequirementInfo],
requested_dict: Dict[str, RequirementInfo],
current_requested: List[RequirementInfo]) -> bool:
"""Обрабатывает ошибки и пытается их разрешить."""
resolved = True
for error in errors:
if error['type'] == 'version_conflict':
conflict_resolved = self._handle_version_conflict(
error, installed_dict, requested_dict, current_requested
)
resolved = resolved and conflict_resolved
elif error['type'] == 'requirement_conflict':
conflict_resolved = self._handle_requirement_conflict(
error, installed_dict, requested_dict, current_requested
)
resolved = resolved and conflict_resolved
return resolved
def _handle_version_conflict(self, error: Dict,
installed_dict: Dict[str, RequirementInfo],
requested_dict: Dict[str, RequirementInfo],
current_requested: List[RequirementInfo]) -> bool:
"""Обрабатывает конфликт версий."""
conflicting_packages = set(error['packages'].keys())
sources = {}
severity = None
# Определяем источники пакетов и их значимость
for pkg in conflicting_packages:
if pkg in installed_dict:
sources[pkg] = 'installed'
elif pkg in requested_dict:
sources[pkg] = 'requested'
else:
sources[pkg] = 'unknown'
# Определяем серьезность конфликта
installed_conflicts = [pkg for pkg in conflicting_packages if sources[pkg] == 'installed']
requested_conflicts = [pkg for pkg in conflicting_packages if sources[pkg] == 'requested']
if len(installed_conflicts) >= 2:
severity = ConflictSeverity.FATAL
elif installed_conflicts and requested_conflicts:
# Проверяем значимость пакетов
installed_severity = max(
installed_dict[pkg].significance_level for pkg in installed_conflicts
)
requested_severity = max(
requested_dict[pkg].significance_level for pkg in requested_conflicts
)
if installed_severity == 'required' and requested_severity == 'required':
severity = ConflictSeverity.FATAL
elif installed_severity == 'required' or requested_severity == 'required':
severity = ConflictSeverity.ERROR
else:
severity = ConflictSeverity.WARNING
else:
# Все пакеты запрашиваемые
severities = [requested_dict[pkg].significance_level for pkg in requested_conflicts]
if 'required' in severities:
severity = ConflictSeverity.ERROR
else:
severity = ConflictSeverity.WARNING
# Создаем запись об ошибке
conflict_error = ConflictError(
conflicting_packages=conflicting_packages,
sources=sources,
severity=severity,
error_message=error['message']
)
self.conflict_errors.append(conflict_error)
# Пытаемся разрешить конфликт
if severity == ConflictSeverity.ERROR:
# Удаляем опциональные пакеты из запрашиваемых
optional_packages = [
pkg for pkg in requested_conflicts
if requested_dict[pkg].significance_level == 'optional'
]
if optional_packages:
for pkg in optional_packages:
self._remove_package(pkg, requested_dict, current_requested)
conflict_error.resolution_action = f"Удален опциональный пакет {pkg}"
return True
elif severity == ConflictSeverity.WARNING:
# Удаляем один из опциональных пакетов
if requested_conflicts:
pkg_to_remove = requested_conflicts[0]
self._remove_package(pkg_to_remove, requested_dict, current_requested)
conflict_error.resolution_action = f"Удален опциональный пакет {pkg_to_remove}"
return True
# Неустранимый конфликт
conflict_error.resolution_action = "Требует ручного вмешательства"
return False
def _handle_requirement_conflict(self, error: Dict,
installed_dict: Dict[str, RequirementInfo],
requested_dict: Dict[str, RequirementInfo],
current_requested: List[RequirementInfo]) -> bool:
"""Обрабатывает конфликт требований."""
pkg = error['package']
sources = {pkg: 'requested' if pkg in requested_dict else 'installed'}
severity = None
# Определяем серьезность конфликта
if pkg in installed_dict:
pkg_severity = installed_dict[pkg].significance_level
else:
pkg_severity = requested_dict[pkg].significance_level
if pkg_severity == 'required':
severity = ConflictSeverity.ERROR
else:
severity = ConflictSeverity.WARNING
# Создаем запись об ошибке
conflict_error = ConflictError(
conflicting_packages={pkg},
sources=sources,
severity=severity,
error_message=error['message']
)
self.conflict_errors.append(conflict_error)
# Пытаемся разрешить конфликт
if severity == ConflictSeverity.ERROR and pkg in requested_dict:
# Для требуемых пакетов пытаемся найти совместимую версию
compatible = self._find_compatible_version(pkg, error['required'], requested_dict)
if compatible:
self._update_package_version(pkg, compatible, requested_dict, current_requested)
conflict_error.resolution_action = f"Обновлена версия {pkg} до {compatible}"
return True
else:
# Не удалось найти совместимую версию
conflict_error.resolution_action = "Не удалось найти совместимую версию"
return False
elif severity == ConflictSeverity.WARNING and pkg in requested_dict:
# Удаляем опциональный пакет
self._remove_package(pkg, requested_dict, current_requested)
conflict_error.resolution_action = f"Удален опциональный пакет {pkg}"
return True
return False
def _remove_package(self, package_name: str,
requested_dict: Dict[str, RequirementInfo],
current_requested: List[RequirementInfo]):
"""Удаляет пакет из списка запрашиваемых."""
if package_name in requested_dict:
package_to_remove = requested_dict[package_name]
current_requested.remove(package_to_remove)
del requested_dict[package_name]
logger.info(f"Удален пакет: {package_name}")
def _update_package_version(self, package_name: str, new_version: str,
requested_dict: Dict[str, RequirementInfo],
current_requested: List[RequirementInfo]):
"""Обновляет версию пакета в списке запрашиваемых."""
if package_name in requested_dict:
old_req = requested_dict[package_name]
new_req_str = f"{package_name}{new_version}"
# Создаем новый RequirementInfo с обновленной версией
new_req = RequirementInfo.from_requirement_string(
new_req_str, old_req.significance_level, old_req.source_file
)
# Заменяем старый запрос на новый
index = current_requested.index(old_req)
current_requested[index] = new_req
requested_dict[package_name] = new_req
logger.info(f"Обновлена версия {package_name} до {new_version}")
def _find_compatible_version(self, package_name: str, requirement: str,
requested_dict: Dict[str, RequirementInfo]) -> Optional[str]:
"""Пытается найти совместимую версию пакета."""
# В реальной реализации здесь должен быть вызов к API PyPI или использование
# библиотеки для разрешения зависимостей, например, pip-tools или poetry
# Это упрощенная реализация
logger.warning(f"Поиск совместимой версии для {package_name} не реализован")
return None

View File

@@ -1,21 +0,0 @@
from pythonapp.Decider.Loader import RequirementInfo
from pythonapp.Env.Env import Env
from pythonapp.Libs.pip_api import pip_api
class PipConflictGrubber:
@classmethod
def check_for_conflicts(cls,
env: Env,
installed_reqs: list[RequirementInfo],
requested_reqs: list[RequirementInfo],
repo_url: str
):
tmp = [str(req) for req in installed_reqs]
tmp.extend([str(req) for req in requested_reqs])
result = pip_api.run_pip_install(env.pip_path, tmp, extra_index_urls=[repo_url])
if not result.success:
raise RuntimeError(result.stderr)

View File

@@ -1,101 +0,0 @@
import subprocess
from pythonapp.Env.Env import Env
import os
from typing import Literal
from pathlib import Path
import subprocess
import re
from dataclasses import dataclass, field
from typing import List, Dict, Set, Tuple, Optional
from enum import Enum
import logging
# Настройка логирования
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ConflictSeverity(Enum):
FATAL = "Fatal"
ERROR = "Error"
WARNING = "Warning"
@dataclass
class ConflictError:
"""Класс для хранения информации о конфликте зависимостей."""
conflicting_packages: Set[str]
sources: Dict[str, str] # package -> source (installed/requested)
severity: ConflictSeverity
error_message: str
resolution_action: str = ""
def __post_init__(self):
# Автоматически определяем источники, если не предоставлены
if not self.sources:
self.sources = {pkg: "unknown" for pkg in self.conflicting_packages}
class Resolver:
pass
"""
Класс для поиска и обработки конфликтов зависимостей
Attributes:
req_list (list[RequirementInfo])
Список установленных пакетов в системе
requirement_str (str):
Полная строка требования как в файле requirements.txt.
Пример: "requests>=2.25.0" или "numpy==1.21.0".
package_name (str):
Имя пакета без указания версии.
Пример: "requests" или "numpy".
significance_level (Literal["manual", "required", "optional"]):
Уровень значимости требования по убыванию важности:
- "manual": пакеты, установленные вручную
- "required": обязательные зависимости
- "optional": опциональные зависимости
source_file (str):
Имя файла требований, из которого было извлечено это требование.
Пример: "core.req" или "dev.opt".
"""
"""
Теперь давай разработаем статический класс Resolver и его метод check integrity. Приведенного ниже алгоритма есть проблема, что он может игнорировать некоторые опциональное зависимости, но не сообщает об этом коду, который далее будет устанавливать пакеты и не удаляет их из self.config.requirements_dir.
ВНИМАНИЕ!!! Этот код не должен менять сами файлы. Он может только удалять пакеты из своих временных списков, но не из файлов. Также он должен быть полностью инкапсулирован от класса Instance. Все необходимые для его работы данные должны быть переданы непосредственно при вызове метода check_integrity. Данные класса instance, ровно, как и принадлежащие ему файлы не должны быть изменены в результате работы. Все что делает этот код - получает исходные данные и на их основе принимает решение и возвращает ответ, никак не трогая сами исходные данные.
В соответствии с этим классом должен быть изменен метод check_integrity класса Instance, но ничего более.
Метод всегда должен возвращать три переменных: список оставшихся пакетов, список ошибок на req, список ошибок на opt. Переделай check integrity класса instance с учетом этого.
Также pip dry run должен проводиться в venv окружении self.config.test_venv
его алгоритм:
ЭТАП Б: Обработка ошибок
1. Пытается запустить pip --dry-run со всеми пакетами из обоих списков. Если ошибок нет, возвращает true
2. Если ошибки есть, то парсит вывод pip, находит пакет, вызвавший конфликт в списке запрашиваемых пакетов, и пакет, который конфликтует с ним в словарях req и opt,
3. Если пакет найден в opt и флаг required == True, удаляет его из словарей списков. Если пакет найден в req или флаг required == False, удаляет пакет из списка запрашиваемых
4. Заново формирует общие списки.
5. Переносит имя нового и установленного конфликтующих пакетов в списки err_req и err_opt dataclass ErrorDesc, где содержится их имена с версиями, а также полный текст ошибки pip.
6. Повторяет процесс обработки ошибок, пока ошибок не останется.
ЭТАП В: Подведение итогов
1. Если флаг interactive, то выводим список ошибок, спрашиваем у пользователя, продолжать или нет.
2. Если флаг required == False, возвращаем список оставшихся запрашиваемых пакетов
3. Иначе если список запрашиваемых пакетов остался неизменным, возвращаем его, если нет, исключение - неразрешимый конфликт.
"""

View File

View File

@@ -1,233 +0,0 @@
import shutil
from pathlib import Path
from typing import Optional, List
from dataclasses import dataclass
from .Environments.venv import Venv
@dataclass
class InstanceConfig:
"""Конфигурация экземпляра приложения"""
path: Path
requirements_dir: Path
manual_requirements: Path
freezed_packages: Path
created: bool = False
class Instance:
"""Базовый класс для управления изолированными окружениями"""
def __init__(self, path: str, env: str = "venv"):
self.path = Path(path)
self.config = InstanceConfig(
path=self.path,
requirements_dir=self.path / ".vaiola" / "requirements.d",
manual_requirements=self.path / ".vaiola" / "requirements.txt",
freezed_packages=self.path / ".vaiola" / "freezed_packages.txt",
created=(self.path / ".vaiola").exists()
)
self.freezed_packages_list: List[str] = []
if env == "venv":
self.env = Venv(self.path / "venv")
if not self.config.created:
self.env.create()
self.config.requirements_dir.mkdir(parents=True, exist_ok=True)
self.config.freezed_packages.touch()
self.config.manual_requirements.touch()
else:
self._load_freezed_packages()
def _load_freezed_packages(self) -> None:
"""Загружает список замороженных пакетов из файла"""
if self.config.freezed_packages.exists():
with open(self.config.freezed_packages, 'r') as f:
self.freezed_packages_list = [
line.strip() for line in f.readlines() if line.strip()
]
def _parse_package_name(self, package_spec: str) -> str:
"""Извлекает имя пакета из спецификации (убирает версию и доп. параметры)"""
# Удаляем версии и дополнительные параметры
name = re.split(r'[<>=!~]', package_spec)[0].strip()
# Удаляем экстра-параметры (в квадратных скобках)
if '[' in name:
name = name.split('[')[0].strip()
return name
def _is_package_freezed(self, package_spec: str) -> bool:
"""Проверяет, заморожен ли пакет"""
package_name = self._parse_package_name(package_spec)
return package_name in self.freezed_packages_list
def _remove_package_from_file(self, package_spec: str, file_path: Path) -> None:
"""Удаляет пакет из указанного файла"""
package_name = self._parse_package_name(package_spec)
if file_path.exists():
with open(file_path, 'r') as f:
lines = f.readlines()
# Фильтруем строки, убирая те, что содержат этот пакет
filtered_lines = [
line for line in lines
if self._parse_package_name(line.strip()) != package_name
]
with open(file_path, 'w') as f:
f.writelines(filtered_lines)
def check_consistency(self) -> bool:
"""Проверяет целостность окружения"""
# TODO: Реализовать проверку целостности
return True
def install_requirements(
self,
requirements: List[str],
name: str,
force: bool = False,
extra_index_url: Optional[str] = None
) -> None:
"""Устанавливает требования из списка"""
requirements_file = self.config.requirements_dir / f"{name}.req"
requirements_file.write_text("\n".join(requirements))
if force:
self.env.install_requirements(requirements, extra_index_url)
def install_requirements_file(
self,
requirements_file: str,
name: str,
force: bool = False,
requirement_level: str = 'req',
extra_index_url: Optional[str] = None
) -> None:
"""Устанавливает требования из файла"""
target_file = self.config.requirements_dir / f"{name}.{requirement_level}"
shutil.copy2(requirements_file, target_file)
if force:
self.env.install_requirements_file(requirements_file, extra_index_url)
def install_package(
self,
package: str,
extra_index_url: Optional[str] = None,
force: bool = False,
freeze: bool = False
) -> None:
"""
Устанавливает пакет с учетом всех требований
Args:
package: Спецификация пакета в формате pip
extra_index_url: Дополнительный URL репозитория
force: Принудительная установка, даже если пакет заморожен
freeze: Заморозить пакет после установки
"""
# Шаг 1: Проверка флага force
if not force:
# Шаг 2: Проверка на заморозку
if self._is_package_freezed(package):
raise ValueError(f"Пакет {self._parse_package_name(package)} заморожен и не может быть установлен")
# Шаг 3: Проверка целостности
if not self.check_integrity():
raise RuntimeError("Невозможно установить пакет: проверка целостности не пройдена")
# Шаг 4: Установка пакета
try:
self.env.install_package(package, extra_index_url)
except Exception as e:
raise RuntimeError(f"Ошибка установки пакета {package}: {e}")
# Шаг 5: Удаление существующих упоминаний пакета
self._remove_package_from_file(package, self.config.manual_requirements)
# Шаг 6: Добавление пакета в manual_requirements
with open(self.config.manual_requirements, 'a') as f:
f.write(f"{package}\n")
# Шаг 7: Проверка флага freeze
if not freeze:
return
# Шаг 8: Удаление из freezed_packages
self._remove_package_from_file(package, self.config.freezed_packages)
self._load_freezed_packages() # Обновляем список
# Шаг 9: Добавление в freezed_packages (только имя пакета)
package_name = self._parse_package_name(package)
with open(self.config.freezed_packages, 'a') as f:
f.write(f"{package_name}\n")
self.freezed_packages_list.append(package_name)
def install_optional_requirements(
self,
requirements_file: str,
name: str,
force: bool = False
) -> None:
"""Устанавливает опциональные требования"""
self.install_requirements_file(
requirements_file, name, force, requirement_level='opt'
)
class GenericPyTorchInstance(Instance):
"""Базовый класс для экземпляров с PyTorch"""
BASE_URL = "https://download.pytorch.org/whl/"
def __init__(
self,
path: str,
api: str,
torch_version: str,
torchvision_version: str,
torchaudio_version: str,
env: str = "venv",
freeze_torch: bool = True
):
super().__init__(path, env)
self.url = f"{self.BASE_URL.rstrip('/')}/{api}"
self.torch_version = torch_version
self.torchvision_version = torchvision_version
self.torchaudio_version = torchaudio_version
# Установка PyTorch компонентов
self.install_package(f"torch=={torch_version}", self.url, force=True)
self.install_package(f"torchvision=={torchvision_version}", self.url, force=True)
self.install_package(f"torchaudio=={torchaudio_version}", self.url, force=True)
if freeze_torch:
self.freezed_packages.extend([
f"torch=={torch_version}",
f"torchvision=={torchvision_version}",
f"torchaudio=={torchaudio_version}"
])
class ComfyUIInstance(GenericPyTorchInstance):
"""Специализированный класс для ComfyUI"""
def __init__(
self,
path: str,
api: str,
torch_version: str,
torchvision_version: str,
torchaudio_version: str,
env: str = "venv"
):
super().__init__(
path, api, torch_version, torchvision_version, torchaudio_version, env
)
self.app_dir = self.path / 'app'

View File

@@ -1,257 +0,0 @@
import re
import subprocess
from pathlib import Path
from typing import List, Dict, Tuple, Optional, Union
from dataclasses import dataclass
from collections import defaultdict
@dataclass
class ErrorDesc:
"""Контейнер для информации об ошибках разрешения зависимостей"""
package: str
conflict_package: str
error_message: str
class Resolver:
"""Статический класс для разрешения зависимостей пакетов"""
@staticmethod
def _parse_package_name(package_spec: str) -> str:
"""Извлекает чистое имя пакета из спецификации"""
# Удаляем версии, условия и экстра-параметры
name = re.split(r'[<>=!~\[\]]', package_spec)[0].strip()
return name
@staticmethod
def _load_packages_from_file(file_path: Path) -> List[str]:
"""Загружает список пакетов из файла"""
if not file_path.exists():
return []
with open(file_path, 'r') as f:
lines = [line.strip() for line in f.readlines() if line.strip() and not line.startswith('#')]
return lines
@staticmethod
def _load_packages_from_dir(dir_path: Path, extension: str) -> Dict[str, List[str]]:
"""Загружает пакеты из всех файлов в директории с указанным расширением"""
packages_dict = {}
if not dir_path.exists():
return packages_dict
for file_path in dir_path.glob(f"*.{extension}"):
key = file_path.stem
packages_dict[key] = Resolver._load_packages_from_file(file_path)
return packages_dict
@staticmethod
def _remove_freezed_packages(packages_list: List[str], freezed_packages: List[str]) -> List[str]:
"""Удаляет замороженные пакеты из списка"""
freezed_names = [Resolver._parse_package_name(pkg) for pkg in freezed_packages]
return [
pkg for pkg in packages_list
if Resolver._parse_package_name(pkg) not in freezed_names
]
@staticmethod
def _run_pip_dry_run(packages: List[str]) -> Tuple[bool, str, Optional[str]]:
"""Запускает pip install --dry-run и возвращает результат"""
if not packages:
return True, "", None
command = ["python", "-m", "pip", "install", "--dry-run"] + packages
try:
result = subprocess.run(
command,
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
return True, result.stdout, None
else:
return False, result.stdout, result.stderr
except subprocess.TimeoutExpired:
return False, "", "Timeout exceeded"
except Exception as e:
return False, "", str(e)
@staticmethod
def _find_conflicting_packages(error_output: str, all_packages: List[str]) -> Tuple[Optional[str], Optional[str]]:
"""Анализирует вывод pip для поиска конфликтующих пакетов"""
# Паттерны для поиска конфликтующих пакетов в выводе pip
patterns = [
r"ERROR: Cannot install (.+?) because (.+?) conflicts with (.+?)",
r"ERROR: (.+?) is incompatible with (.+?)",
r"Conflict: (.+?) conflicts with (.+?)",
]
for pattern in patterns:
match = re.search(pattern, error_output)
if match:
groups = match.groups()
for i in range(len(groups)):
for package in all_packages:
if groups[i] in package:
if i + 1 < len(groups):
for conflict_package in all_packages:
if groups[i + 1] in conflict_package:
return package, conflict_package
break
return None, None
@staticmethod
def check_integrity(
requested_packages: List[str],
manual_requirements_path: Path,
requirements_dir: Path,
freezed_packages_path: Path,
required: bool = True,
interactive: bool = False
) -> Union[List[str], Tuple[List[str], List[ErrorDesc]]]:
"""
Проверяет целостность и разрешает конфликты зависимостей
Returns:
Union[List[str], Tuple[List[str], List[ErrorDesc]]]:
Список пакетов или кортеж (список пакетов, список ошибок)
"""
# ЭТАП A: Загрузка
freezed_packages = Resolver._load_packages_from_file(freezed_packages_path)
# Загружаем manual requirements
manual_packages = Resolver._load_packages_from_file(manual_requirements_path)
manual_packages = Resolver._remove_freezed_packages(manual_packages, freezed_packages)
# Загружаем req и opt файлы
req_dict = Resolver._load_packages_from_dir(requirements_dir, "req")
opt_dict = Resolver._load_packages_from_dir(requirements_dir, "opt")
# Удаляем замороженные пакеты из всех словарей
for key in list(req_dict.keys()):
req_dict[key] = Resolver._remove_freezed_packages(req_dict[key], freezed_packages)
for key in list(opt_dict.keys()):
opt_dict[key] = Resolver._remove_freezed_packages(opt_dict[key], freezed_packages)
# Формируем общие списки
all_req = manual_packages + [pkg for packages in req_dict.values() for pkg in packages]
all_opt = [pkg for packages in opt_dict.values() for pkg in packages]
# Удаляем замороженные пакеты из запрошенных
requested_packages = Resolver._remove_freezed_packages(requested_packages, freezed_packages)
# ЭТАП Б: Обработка ошибок
all_packages = all_req + all_opt + requested_packages
errors = []
while True:
success, stdout, stderr = Resolver._run_pip_dry_run(all_packages)
if success:
break
if not stderr:
raise RuntimeError("Pip failed but no error output available")
# Ищем конфликтующие пакеты
conflict_package, conflict_with = Resolver._find_conflicting_packages(stderr, all_packages)
if not conflict_package or not conflict_with:
# Если не удалось определить конфликт, используем эвристику
if required:
if all_opt:
# Удаляем первый опциональный пакет
removed_package = all_opt.pop(0)
all_packages.remove(removed_package)
errors.append(ErrorDesc(
package=removed_package,
conflict_package="unknown",
error_message=stderr
))
continue
else:
if requested_packages:
# Удаляем первый запрошенный пакет
removed_package = requested_packages.pop(0)
all_packages.remove(removed_package)
errors.append(ErrorDesc(
package=removed_package,
conflict_package="unknown",
error_message=stderr
))
continue
# Если нечего удалять, выбрасываем исключение
raise RuntimeError(f"Unresolvable conflict: {stderr}")
# Определяем, в каком списке находится конфликтующий пакет
conflict_in_opt = any(conflict_package in packages for packages in opt_dict.values())
conflict_in_req = any(conflict_package in packages for packages in req_dict.values()) or conflict_package in manual_packages
# Определяем, в каком списке находится пакет, с которым конфликт
conflict_with_in_opt = any(conflict_with in packages for packages in opt_dict.values())
conflict_with_in_req = any(conflict_with in packages for packages in req_dict.values()) or conflict_with in manual_packages
# Принимаем решение об удалении пакета
if conflict_in_opt and required:
# Удаляем из опциональных
for key in list(opt_dict.keys()):
if conflict_package in opt_dict[key]:
opt_dict[key].remove(conflict_package)
all_packages.remove(conflict_package)
break
else:
# Удаляем из запрошенных
if conflict_package in requested_packages:
requested_packages.remove(conflict_package)
all_packages.remove(conflict_package)
else:
# Если конфликтующий пакет не в запрошенных, удаляем его из всех списков
for key in list(req_dict.keys()):
if conflict_package in req_dict[key]:
req_dict[key].remove(conflict_package)
all_packages.remove(conflict_package)
break
for key in list(opt_dict.keys()):
if conflict_package in opt_dict[key]:
opt_dict[key].remove(conflict_package)
all_packages.remove(conflict_package)
break
# Добавляем информацию об ошибке
errors.append(ErrorDesc(
package=conflict_package,
conflict_package=conflict_with,
error_message=stderr
))
# ЭТАП В: Подведение итогов
if interactive and errors:
print("Обнаружены конфликты зависимостей:")
for error in errors:
print(f"- {error.package} конфликтует с {error.conflict_package}")
print(f" Ошибка: {error.error_message[:100]}...")
response = input("Продолжить установку с разрешенными конфликтами? (y/n): ")
if response.lower() != 'y':
raise RuntimeError("Установка отменена пользователем")
if not required:
return requested_packages
else:
# Проверяем, изменился ли список запрошенных пакетов
original_requested = set(requested_packages)
current_requested = set(requested_packages)
if original_requested != current_requested:
raise RuntimeError("Неразрешимый конфликт: пришлось удалить запрошенные пакеты")
return requested_packages

View File

@@ -1,41 +0,0 @@
from ..Web.pytorch import *
from ..Instance import *
class NumericMenu:
@staticmethod
def print_menu(items: list[str], prompt: str = "Choise: ") -> int:
for i in range(len(items)): print(f'{i + 1}. {items[i]}')
return int(input(prompt))
@classmethod
def menu_create_instance_generic_pytorch_application(cls):
api = input("Type your api version code. Ex. cu121 for cuda 12.1: ")
t = pytorch.versions_torch(api)
choise = cls.print_menu(t, "torch version: ")
t = t[choise - 1]
tv = pytorch.versions_torchvision(api)
choise = cls.print_menu(tv, "torchvision version: ")
tv = tv[choise - 1]
ta = pytorch.versions_torchaudio(api)
choise = cls.print_menu(ta, "torchaudio version: ")
ta = ta[choise - 1]
print("Pytorch version:", t, "torchvision:", tv, "torchaudio:", ta)
path = input("Instance path: ")
instance = GenericPytorchInstance(path, api, t, tv, ta)
@classmethod
def menu_create_instance(cls):
choise = cls.print_menu(['Generic PyTorch Application', 'ComfyUI', 'Stable Diffusion Forge'])
if choise == 1: cls.menu_create_instance_generic_pytorch_application()
@classmethod
def start(cls):
choise = cls.print_menu(['Create instance'])
if choise == 1:
cls.menu_create_instance()

View File

@@ -1,3 +1,4 @@
from modelspace.ModelPackageSelector import format_bytes
from modelspace.ModelSpace import ModelSpace
from shell.Handlers.ABS import Handler
from modelspace.Repository import global_repo
@@ -14,6 +15,8 @@ class ModelSpaceHandler(Handler):
'install-all': self._install_all,
# 'create': self._create,
'load': self._load,
'list': self._list,
'debug': self._debug,
# 'show': self._show,
# 'activate': self._activate,
@@ -55,7 +58,8 @@ class ModelSpaceHandler(Handler):
# keys, args = self.parse_arguments(command[pos:], ['env', 'path', 'python'])
def _install(self, command: list[str], pos = 0):
for resource in command[pos:]: self._active_instance.install(resource)
keys, args = self.parse_arguments(command[pos:], ['answer'])
for res in args: self._active_instance.install(res, keys['answer'])
self.succeed = True
def _install_all(self, command: list[str], pos = 0):
@@ -63,3 +67,15 @@ class ModelSpaceHandler(Handler):
self._active_instance.install(resource, answer='all')
self.succeed = True
def _list(self, command: list[str], pos = 0):
keys, args = self.parse_arguments(command[pos:], ['long'])
packages = list(self._active_instance.installed_packages)
lines = [f'{p.name:<{30}} {p.quantization:<{10}} {format_bytes(p.size_bytes):<{10}} {p.version:<{15}}' for p in packages]
lines.sort()
for line in lines:
print(line)
def _debug(self, command: list[str], pos = 0):
self.succeed = True