Files
vaiola/deprecated/program/Engine/Resolver.py
2025-09-12 17:18:13 +07:00

258 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import subprocess
from pathlib import Path
from typing import List, Dict, Tuple, Optional, Union
from dataclasses import dataclass
from collections import defaultdict
@dataclass
class ErrorDesc:
"""Контейнер для информации об ошибках разрешения зависимостей"""
package: str
conflict_package: str
error_message: str
class Resolver:
"""Статический класс для разрешения зависимостей пакетов"""
@staticmethod
def _parse_package_name(package_spec: str) -> str:
"""Извлекает чистое имя пакета из спецификации"""
# Удаляем версии, условия и экстра-параметры
name = re.split(r'[<>=!~\[\]]', package_spec)[0].strip()
return name
@staticmethod
def _load_packages_from_file(file_path: Path) -> List[str]:
"""Загружает список пакетов из файла"""
if not file_path.exists():
return []
with open(file_path, 'r') as f:
lines = [line.strip() for line in f.readlines() if line.strip() and not line.startswith('#')]
return lines
@staticmethod
def _load_packages_from_dir(dir_path: Path, extension: str) -> Dict[str, List[str]]:
"""Загружает пакеты из всех файлов в директории с указанным расширением"""
packages_dict = {}
if not dir_path.exists():
return packages_dict
for file_path in dir_path.glob(f"*.{extension}"):
key = file_path.stem
packages_dict[key] = Resolver._load_packages_from_file(file_path)
return packages_dict
@staticmethod
def _remove_freezed_packages(packages_list: List[str], freezed_packages: List[str]) -> List[str]:
"""Удаляет замороженные пакеты из списка"""
freezed_names = [Resolver._parse_package_name(pkg) for pkg in freezed_packages]
return [
pkg for pkg in packages_list
if Resolver._parse_package_name(pkg) not in freezed_names
]
@staticmethod
def _run_pip_dry_run(packages: List[str]) -> Tuple[bool, str, Optional[str]]:
"""Запускает pip install --dry-run и возвращает результат"""
if not packages:
return True, "", None
command = ["python", "-m", "pip", "install", "--dry-run"] + packages
try:
result = subprocess.run(
command,
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
return True, result.stdout, None
else:
return False, result.stdout, result.stderr
except subprocess.TimeoutExpired:
return False, "", "Timeout exceeded"
except Exception as e:
return False, "", str(e)
@staticmethod
def _find_conflicting_packages(error_output: str, all_packages: List[str]) -> Tuple[Optional[str], Optional[str]]:
"""Анализирует вывод pip для поиска конфликтующих пакетов"""
# Паттерны для поиска конфликтующих пакетов в выводе pip
patterns = [
r"ERROR: Cannot install (.+?) because (.+?) conflicts with (.+?)",
r"ERROR: (.+?) is incompatible with (.+?)",
r"Conflict: (.+?) conflicts with (.+?)",
]
for pattern in patterns:
match = re.search(pattern, error_output)
if match:
groups = match.groups()
for i in range(len(groups)):
for package in all_packages:
if groups[i] in package:
if i + 1 < len(groups):
for conflict_package in all_packages:
if groups[i + 1] in conflict_package:
return package, conflict_package
break
return None, None
@staticmethod
def check_integrity(
requested_packages: List[str],
manual_requirements_path: Path,
requirements_dir: Path,
freezed_packages_path: Path,
required: bool = True,
interactive: bool = False
) -> Union[List[str], Tuple[List[str], List[ErrorDesc]]]:
"""
Проверяет целостность и разрешает конфликты зависимостей
Returns:
Union[List[str], Tuple[List[str], List[ErrorDesc]]]:
Список пакетов или кортеж (список пакетов, список ошибок)
"""
# ЭТАП A: Загрузка
freezed_packages = Resolver._load_packages_from_file(freezed_packages_path)
# Загружаем manual requirements
manual_packages = Resolver._load_packages_from_file(manual_requirements_path)
manual_packages = Resolver._remove_freezed_packages(manual_packages, freezed_packages)
# Загружаем req и opt файлы
req_dict = Resolver._load_packages_from_dir(requirements_dir, "req")
opt_dict = Resolver._load_packages_from_dir(requirements_dir, "opt")
# Удаляем замороженные пакеты из всех словарей
for key in list(req_dict.keys()):
req_dict[key] = Resolver._remove_freezed_packages(req_dict[key], freezed_packages)
for key in list(opt_dict.keys()):
opt_dict[key] = Resolver._remove_freezed_packages(opt_dict[key], freezed_packages)
# Формируем общие списки
all_req = manual_packages + [pkg for packages in req_dict.values() for pkg in packages]
all_opt = [pkg for packages in opt_dict.values() for pkg in packages]
# Удаляем замороженные пакеты из запрошенных
requested_packages = Resolver._remove_freezed_packages(requested_packages, freezed_packages)
# ЭТАП Б: Обработка ошибок
all_packages = all_req + all_opt + requested_packages
errors = []
while True:
success, stdout, stderr = Resolver._run_pip_dry_run(all_packages)
if success:
break
if not stderr:
raise RuntimeError("Pip failed but no error output available")
# Ищем конфликтующие пакеты
conflict_package, conflict_with = Resolver._find_conflicting_packages(stderr, all_packages)
if not conflict_package or not conflict_with:
# Если не удалось определить конфликт, используем эвристику
if required:
if all_opt:
# Удаляем первый опциональный пакет
removed_package = all_opt.pop(0)
all_packages.remove(removed_package)
errors.append(ErrorDesc(
package=removed_package,
conflict_package="unknown",
error_message=stderr
))
continue
else:
if requested_packages:
# Удаляем первый запрошенный пакет
removed_package = requested_packages.pop(0)
all_packages.remove(removed_package)
errors.append(ErrorDesc(
package=removed_package,
conflict_package="unknown",
error_message=stderr
))
continue
# Если нечего удалять, выбрасываем исключение
raise RuntimeError(f"Unresolvable conflict: {stderr}")
# Определяем, в каком списке находится конфликтующий пакет
conflict_in_opt = any(conflict_package in packages for packages in opt_dict.values())
conflict_in_req = any(conflict_package in packages for packages in req_dict.values()) or conflict_package in manual_packages
# Определяем, в каком списке находится пакет, с которым конфликт
conflict_with_in_opt = any(conflict_with in packages for packages in opt_dict.values())
conflict_with_in_req = any(conflict_with in packages for packages in req_dict.values()) or conflict_with in manual_packages
# Принимаем решение об удалении пакета
if conflict_in_opt and required:
# Удаляем из опциональных
for key in list(opt_dict.keys()):
if conflict_package in opt_dict[key]:
opt_dict[key].remove(conflict_package)
all_packages.remove(conflict_package)
break
else:
# Удаляем из запрошенных
if conflict_package in requested_packages:
requested_packages.remove(conflict_package)
all_packages.remove(conflict_package)
else:
# Если конфликтующий пакет не в запрошенных, удаляем его из всех списков
for key in list(req_dict.keys()):
if conflict_package in req_dict[key]:
req_dict[key].remove(conflict_package)
all_packages.remove(conflict_package)
break
for key in list(opt_dict.keys()):
if conflict_package in opt_dict[key]:
opt_dict[key].remove(conflict_package)
all_packages.remove(conflict_package)
break
# Добавляем информацию об ошибке
errors.append(ErrorDesc(
package=conflict_package,
conflict_package=conflict_with,
error_message=stderr
))
# ЭТАП В: Подведение итогов
if interactive and errors:
print("Обнаружены конфликты зависимостей:")
for error in errors:
print(f"- {error.package} конфликтует с {error.conflict_package}")
print(f" Ошибка: {error.error_message[:100]}...")
response = input("Продолжить установку с разрешенными конфликтами? (y/n): ")
if response.lower() != 'y':
raise RuntimeError("Установка отменена пользователем")
if not required:
return requested_packages
else:
# Проверяем, изменился ли список запрошенных пакетов
original_requested = set(requested_packages)
current_requested = set(requested_packages)
if original_requested != current_requested:
raise RuntimeError("Неразрешимый конфликт: пришлось удалить запрошенные пакеты")
return requested_packages