Harden image fetch algorithm

Prepare for database integration
This commit is contained in:
2025-09-24 20:07:19 +07:00
parent 88d40c0d99
commit 3a88bdad3a
17 changed files with 600 additions and 26 deletions

View File

@@ -0,0 +1,53 @@
import json
import atexit
from dataclasses import dataclass, asdict, fields
from typing import Any, Dict
from pathlib import Path
@dataclass
class Config:
filename: str
autosave: bool = False
def __post_init__(self):
# Загружаем значения из файла при создании экземпляра
self.load()
# Регистрируем автоматическое сохранение при завершении программы
if self.autosave:
atexit.register(self.save)
def load(self) -> None:
"""Загружает значения полей из файла"""
try:
if Path(self.filename).exists():
with open(self.filename, 'r') as f:
data = json.load(f)
for field in fields(self):
if field.name in data and field.name != 'filename':
setattr(self, field.name, data[field.name])
except Exception as e:
print(f"Error loading config: {e}")
def save(self) -> None:
"""Сохраняет текущие значения полей в файл"""
try:
# Преобразуем объект в словарь, исключая поле filename
data = asdict(self)
data.pop('filename', None)
# Создаем директорию, если она не существует
Path(self.filename).parent.mkdir(parents=True, exist_ok=True)
# Сохраняем в файл с кастомным сериализатором для Path объектов
with open(self.filename, 'w') as f:
json.dump(data, f, indent=4, default=self._json_serializer)
except Exception as e:
print(f"Error saving config: {e}")
def _json_serializer(self, obj: Any) -> Any:
"""Кастомный сериализатор для объектов, которые не могут быть сериализованы по умолчанию"""
if isinstance(obj, Path):
return str(obj)
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")

View File

@@ -0,0 +1,90 @@
from dataclasses import dataclass, fields
from typing import Optional, List
from DataClassJson import DataClassJson
from modules.shared.DatabaseAbstraction import Cursor
types = {bool: 'INTEGER', int: 'INTEGER', float: 'REAL', str: "TEXT",
Optional[bool]: 'INTEGER', Optional[int]: 'INTEGER', Optional[float]: 'REAL', Optional[str]: "TEXT", }
@dataclass
class DataClassDatabase(DataClassJson):
_main_entity: bool = None
_table_name: str = None
pass
def __post_init__(self):
super().__post_init__()
@classmethod
def get_create_sqls(cls, table_name = None):
result: list[str] = list()
result.append(f'CREATE TABLE IF NOT EXISTS {table_name} (fk TEXT NOT NULL, pk TEXT NOT NULL, PRIMARY KEY(pk, fk));')
tmp_instance = cls()
if not table_name: table_name = tmp_instance._table_name
excluded_fields = {f.name for f in fields(DataClassDatabase)}
all_fields = [f for f in fields(cls) if f.name not in excluded_fields and not f.name.startswith('_')]
for field in all_fields:
if field.name in tmp_instance._forwarding:
inner_type: type = tmp_instance._forwarding[field.name]
try: result.extend(inner_type.get_create_sqls())
except Exception as e: raise RuntimeError('invalid forwarding type') from e
elif field.type in { list, Optional[list], Optional[List] }:
result.append(f'CREATE TABLE IF NOT EXISTS {table_name}_{field.name} (fk TEXT NOT NULL, data TEXT NOT NULL);')
else:
result.append(f'ALTER TABLE {table_name} ADD COLUMN {field.name} {types.get(field.type, 'TEXT')};')
return result
@classmethod
def create(cls, cur: Cursor):
for sql in cls.get_create_sqls(): cur.execute(sql)
if __name__ == '__main__':
@dataclass
class ModelStats(DataClassDatabase):
downloadCount: Optional[int] = None
favoriteCount: Optional[int] = None
thumbsUpCount: Optional[int] = None
thumbsDownCount: Optional[int] = None
commentCount: Optional[int] = None
ratingCount: Optional[int] = None
rating: Optional[int] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {}
@dataclass
class Model(DataClassDatabase):
id: Optional[int] = None
name: Optional[str] = None
description: Optional[str] = None
allowNoCredit: Optional[bool] = None
allowCommercialUse: Optional[list] = None
allowDerivatives: Optional[bool] = None
allowDifferentLicense: Optional[bool] = None
type: Optional[str] = None
minor: Optional[bool] = None
sfwOnly: Optional[bool] = None
poi: Optional[bool] = None
nsfw: Optional[bool] = None
nsfwLevel: Optional[int] = None
availability: Optional[str] = None
cosmetic: Optional[str] = None
supportsGeneration: Optional[bool] = None
stats: Optional[ModelStats] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {
'stats': ModelStats,
}
self._key_field = 'id'
self._table_name = 'gagaga'
for s in Model.get_create_sqls():
print(s)

View File

@@ -0,0 +1,185 @@
from dataclasses import dataclass, field, fields
from typing import Dict, Any, Optional
import warnings
# Определим базовый класс для удобного наследования
@dataclass
class DataClassJson:
_forwarding: Dict[str, type] = field(default_factory=dict)
_key_field: str = 'key' # Поле, которое будет использоваться как ключ
fixed: bool = False
# Скрытые поля для хранения данных
_key: Optional[str] = None
other_data: Optional[Dict[str, Any]] = None
# Пример поля, которое будет использоваться в _forwarding
# Должно быть переопределено в дочерних классах
key: Optional[str] = None
def __post_init__(self):
if self._key is not None:
self.key = self._key
@property
def key(self) -> Optional[str]:
return self._key
@key.setter
def key(self, value: str):
self._key = value
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'DataClassJson':
# Создаем экземпляр класса
instance = cls()
instance.fixed = data.get('fixed', False)
instance.other_data = None
# Список всех полей
excluded_fields = {f.name for f in fields(DataClassJson)}
all_fields = {f.name for f in fields(cls) if f.name not in excluded_fields and not f.name.startswith('_')}
# Обрабатываем поля из _forwarding
handled_keys = set()
field_values = {}
for key, value in data.items():
if key in handled_keys:
continue
if key in instance._forwarding:
target_type = instance._forwarding[key]
if isinstance(value, dict):
# Обрабатываем словарь
sub_instance = target_type.from_dict(value)
field_values[key] = sub_instance
handled_keys.add(key)
elif isinstance(value, list):
# Обрабатываем список словарей
results = []
for item in value:
if isinstance(item, dict):
sub_instance = target_type.from_dict(item)
results.append(sub_instance)
else:
# Если элемент не словарь, записываем в other_data
warnings.warn(f"Non-dict value {item} in list for field '{key}' will be added to 'other_data'")
if instance.other_data is None:
instance.other_data = {}
instance.other_data[key] = item # Сохраняем оригинал
field_values[key] = results
handled_keys.add(key)
else:
# Если не словарь и не список, тоже добавляем в other_data
warnings.warn(f"Non-dict/list value {value} for field '{key}' will be added to 'other_data'")
if instance.other_data is None:
instance.other_data = {}
instance.other_data[key] = value
else:
# Обычное поле
if key in all_fields:
field_values[key] = value
handled_keys.add(key)
else:
# Неизвестное поле, добавляем в other_data
warnings.warn(f"Unknown field '{key}', adding to 'other_data'")
if instance.other_data is None:
instance.other_data = {}
instance.other_data[key] = value
# Заполняем обычные поля
for key, value in field_values.items():
setattr(instance, key, value)
# Устанавливаем ключ, если есть
if hasattr(instance, '_key_field') and instance._key_field in data:
instance.key = data[instance._key_field]
# Проверяем флаг fixed и other_data
if instance.fixed and instance.other_data is not None:
raise ValueError("Cannot serialize with fixed=True and non-empty other_data")
return instance
def to_dict(self) -> Dict[str, Any]:
result = {}
excluded_fields = {f.name for f in fields(DataClassJson)}
field_names = [f.name for f in fields(self) if f.name not in excluded_fields and not f.name.startswith('_')]
for field_name in field_names:
if not hasattr(self, field_name):
result[field_name] = None
warnings.warn(f'object not have field {field_name}, something went wrong')
continue
value = getattr(self, field_name)
if not value:
result[field_name] = None
warnings.warn(f'object not have data in field {field_name}, it may be correct situation')
continue
if field_name in self._forwarding:
target_type = self._forwarding[field_name]
result[field_name] = list()
single = False
if not isinstance(value, list):
single = True
value = [value]
for v in value:
try:
v = v.to_dict()
except Exception as e:
warnings.warn(str(e))
finally:
result[field_name].append(v)
if single: result[field_name] = result[field_name][0]
continue
else: result[field_name] = value
# Добавляем other_data, если есть
if self.other_data and isinstance(self.other_data, dict):
for key, value in self.other_data.items():
if key not in result:
result[key] = value
else:
if not isinstance(result[key], list): result[key] = [result[key]]
if not isinstance(value, list): value = [value]
result[key].extend(value)
return result
# Пример использования:
@dataclass
class Person(DataClassJson):
name: Optional[str] = None
age: Optional[int] = None
email: Optional[str] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {}
self._key_field = 'name'
@dataclass
class User(DataClassJson):
id: Optional[list] = None
username: Optional[str] = None
person: Optional[Person] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {'person': Person}
self._key_field = 'username'
# Пример десериализации:
if __name__ == "__main__":
data = {
"id": [1,2,3,4,5,6],
"username": "user1",
"person": None,
"extra_field": "should_be_in_other_data"
}
user = User.from_dict(data)
data2 = user.to_dict()
print(user.to_dict())

View File

@@ -0,0 +1,52 @@
class Cursor:
def __init__(self, cursor):
pass
def execute(self, sql: str, params: list = None) -> None:
pass
def fetchone(self, sql: str, params: list = None) -> dict:
pass
def fetchmany(self, sql: str = None, params: list = None) -> list[dict]:
pass
def fetchall(self, sql: str, params: list = None) -> list[dict]:
pass
def lastrowid(self):
pass
class Database:
def __init__(self, name: str):
self.name = name
self.connected = False
def commit(self):
pass
def cursor(self) -> Cursor:
pass
class DBContainer:
def __init__(self, db: Database):
self.db: Database = db
def switch_db(self, db: Database):
self.db.commit()
self.db: Database = db
@property
def connected(self) -> bool:
return self.db.connected
def commit(self):
self.db.commit()
def cursor(self) -> Cursor:
return self.db.cursor()

View File

@@ -0,0 +1,91 @@
from pathlib import Path
from .DatabaseAbstraction import Database, Cursor
import sqlite3 as sq
class SQLiteCursor(Cursor):
def __init__(self, cursor):
super().__init__(cursor)
self._cursor = cursor
def execute(self, sql: str, params: list = None) -> None:
"""Выполняет SQL запрос"""
if params is None:
self._cursor.execute(sql)
else:
self._cursor.execute(sql, params)
def fetchone(self, sql: str, params: list = None) -> dict:
"""Получает одну строку результата"""
if params is None:
self._cursor.execute(sql)
else:
self._cursor.execute(sql, params)
row = self._cursor.fetchone()
if row is None:
return None
# Преобразуем в словарь с именами колонок
columns = [description[0] for description in self._cursor.description]
return dict(zip(columns, row))
def fetchmany(self, sql: str = None, params: list = None) -> list[dict]:
"""Получает несколько строк результата"""
if sql is not None:
if params is None:
self._cursor.execute(sql)
else:
self._cursor.execute(sql, params)
rows = self._cursor.fetchmany()
if not rows:
return []
# Преобразуем в список словарей
columns = [description[0] for description in self._cursor.description]
return [dict(zip(columns, row)) for row in rows]
def fetchall(self, sql: str, params: list = None) -> list[dict]:
"""Получает все строки результата"""
if params is None:
self._cursor.execute(sql)
else:
self._cursor.execute(sql, params)
rows = self._cursor.fetchall()
if not rows:
return []
# Преобразуем в список словарей
columns = [description[0] for description in self._cursor.description]
return [dict(zip(columns, row)) for row in rows]
def lastrowid(self):
"""Возвращает ID последней вставленной строки"""
return self._cursor.lastrowid
class SQLiteDatabase(Database):
def __init__(self, name: str, path = '.'):
super().__init__(name)
self._connection: sq.Connection = sq.connect(Path(path) / (name + '.db'))
self._connection.autocommit = True
self._connection.row_factory = sq.Row # Для получения словарей
self.connected = True
def commit(self):
"""Фиксирует транзакцию"""
if self.connected and self._connection:
self._connection.commit()
def cursor(self) -> Cursor:
"""Создает и возвращает курсор"""
return SQLiteCursor(self._connection.cursor())
def close(self):
"""Закрывает соединение с базой данных"""
if self.connected and self._connection:
self._connection.close()
self.connected = False

View File

@@ -0,0 +1,23 @@
class ListsDict:
def __init__(self):
self._data: dict[str, list] = dict()
def add(self, key, value):
if key not in self._data: self._data[key] = list()
if value not in self._data[key]: self._data[key].append(value)
def delete(self, key, value):
if self._data.get(key, None): self._data[key].remove(value)
@property
def index(self):
res = list()
for key, collection in self._data.items():
for elem in collection:
if elem not in res:
res.append(elem)
return res
def by_key(self, key):
return self._data.get(key, None)

View File

@@ -0,0 +1,25 @@
class SetsDict:
def __init__(self):
self._data: dict[str, set] = dict()
def add(self, key, value):
if key not in self._data: self._data[key] = set()
if value not in self._data[key]: self._data[key].add(value)
def delete(self, key, value):
if self._data.get(key, None): self._data[key].remove(value)
@property
def index(self):
res = set()
for key, collection in self._data.items():
for elem in collection:
if elem not in res:
res.add(elem)
return res
def by_key(self, key):
return self._data.get(key, None)
@property
def keys(self): return self._data.keys()

View File

View File

@@ -0,0 +1,11 @@
def format_bytes(bytes_size):
"""Convert bytes to human readable format"""
if bytes_size < 1024:
return f"{bytes_size} B"
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_size < 1024.0:
return f"{bytes_size:.1f} {unit}"
bytes_size /= 1024.0
return f"{bytes_size:.1f} PB"

View File

@@ -0,0 +1,44 @@
def select_elements(lst, selection_string):
"""
Выбирает элементы из списка согласно строке выбора
Args:
lst: Исходный список
selection_string: Строка вида "1 2 4-6 all"
Returns:
Новый список с выбранными элементами, отсортированными по номерам
"""
selection_string = selection_string.strip()
if not selection_string.strip():
return []
if selection_string == "all":
return lst.copy()
selected_indices = set()
parts = selection_string.split()
for part in parts:
if '-' in part:
# Обработка диапазона
start, end = map(int, part.split('-'))
# Обработка диапазона в любом направлении
if start <= end:
selected_indices.update(range(start, end + 1))
else:
selected_indices.update(range(start, end - 1, -1))
else:
# Обработка отдельного элемента
selected_indices.add(int(part))
# Преобразуем в список и сортируем по номерам
sorted_indices = sorted(selected_indices)
# Выбираем элементы
result = []
for idx in sorted_indices:
if 0 <= idx < len(lst):
result.append(lst[idx])
return result