Compare commits

...

10 Commits

Author SHA1 Message Date
40c320a1ac Common commit 2025-10-16 18:42:32 +07:00
124065d2ac Database dataclass is now working 2025-09-25 20:05:51 +07:00
3a88bdad3a Harden image fetch algorithm
Prepare for database integration
2025-09-24 20:07:19 +07:00
88d40c0d99 Add civit model pull ability 2025-09-20 12:21:31 +07:00
991d655756 Add civit model pull ability 2025-09-20 12:12:56 +07:00
65bffc38eb Add first example script 2025-09-19 18:16:26 +07:00
fd0abfd713 Add civit module
Add civit.fetch function
2025-09-19 18:15:03 +07:00
817283034a Add civit module
Add civit.fetch function
2025-09-19 18:12:57 +07:00
70e6e7a00b Merge remote-tracking branch 'origin/main' 2025-09-16 20:15:35 +07:00
154133128c Clean garbage 2025-09-16 20:14:20 +07:00
63 changed files with 3766 additions and 16 deletions

4
.gitignore vendored
View File

@@ -1,3 +1,5 @@
__pycache__/
repo/
deprecated
deprecated
/.idea/.gitignore
/.idea/

115
doc/CivitFetchPaseudocode Normal file
View File

@@ -0,0 +1,115 @@
# Simple json request module
aliases:
retry = "retry 10 times with 1 time(s) cooldown"
start
try get request
-- Network errors block
Having: url
Exception: Network is unreachable or temporary failure in name resolution
Wait until network became available
Exception: Name not resolved
Repeat 10 times with 10 times cooldown
Fatal: target site is dead
-- HTTP errors block
Having: Some HTTP response
Exception: Service unavailable
Repeat 10 times with 10 times cooldown
Throw Exception on higher level
Exception: Internal server error and other HTTP errors (403, 404...)
retry
Throw Exception on higher level
-- Content errors block
Having
Some successful HTTP response
Raised: Service unavailable
wait until initial page become available
retry
try strip cursor if cursor crawler
retry
try decrement cursor/page
retry
try increment cursor/page
retry
Raised: Internal server error and other HTTP errors (403, 404...)
try strip cursor if cursor crawler
retry
try decrement cursor/page
retry
try increment cursor/page
retry
Exception: Response is not json data
retry
try strip cursor and retry if cursor crawler
try decrement cursor/page and retry 1 times
try increment cursor/page and retry 1 times
log error and end crawl
Having: Some json data
Exception: Response not contains {items: list, metadata: dict} fields
retry
try strip cursor and retry if cursor crawler
try decrement cursor/page and retry 1 times
try increment cursor/page and retry 1 times
log error and end crawl
Exception: items is empty and metadata is empty
retry
try strip cursor and retry if cursor crawler
try decrement cursor/page and retry 1 times
try increment cursor/page and retry 1 times
log error and end crawl
Exception: items is empty and metadata is not empty
if result of (try decrement cursor/page and retry 1 times) is 1: end crawl
retry
try strip cursor and retry if cursor crawler
try decrement cursor/page and retry 1 times
log error and end crawl
Exception: if cursor crawler: metadata not have required field "nextPage"
retry
try strip cursor and retry if cursor crawler
try decrement cursor/page and retry 1 times
try increment cursor/page and retry 1 times
log error and end crawl
ExitPoint: items is not empty and metadata is empty
end crawl
Having: Some valid json api response (items is not empty and not cursor crawler or (end crawl flag is set or metadata is not empty))
Exception: Cursor slip (nextPage url equals request url)
if not cursor crawler: pass (not possible)
try increment cursor/page and retry 1 times
Exception: response "items" has no new items (may be caused by cursor system destroy or rare situation, where total_items mod page_items_limit is 0)
try strip cursor and retry if cursor crawler
try increment cursor/page and retry 1 times
log error and end crawl
Warning: Added items != page_items_limit and not end crawl
log warning
Having: some items, added to all crawl items dict

View File

@@ -0,0 +1,3 @@
pythonapp create venv python3 comfy-fresh
pythonapp load app comfy-fresh
pythonapp install gitapp https://github.com/comfyanonymous/ComfyUI.git mod:models

0
gui.py Normal file
View File

View File

@@ -1,4 +1,5 @@
from shell.Interactive import Interactive
Interactive().start()

View File

@@ -49,4 +49,61 @@ class SetsDict:
return self._data.get(key, None)
@property
def keys(self): return self._data.keys()
def keys(self): return self._data.keys()
def select_elements(lst, selection_string):
"""
Выбирает элементы из списка согласно строке выбора
Args:
lst: Исходный список
selection_string: Строка вида "1 2 4-6 all"
Returns:
Новый список с выбранными элементами, отсортированными по номерам
"""
selection_string = selection_string.strip()
if not selection_string.strip():
return []
if selection_string == "all":
return lst.copy()
selected_indices = set()
parts = selection_string.split()
for part in parts:
if '-' in part:
# Обработка диапазона
start, end = map(int, part.split('-'))
# Обработка диапазона в любом направлении
if start <= end:
selected_indices.update(range(start, end + 1))
else:
selected_indices.update(range(start, end - 1, -1))
else:
# Обработка отдельного элемента
selected_indices.add(int(part))
# Преобразуем в список и сортируем по номерам
sorted_indices = sorted(selected_indices)
# Выбираем элементы
result = []
for idx in sorted_indices:
if 0 <= idx < len(lst):
result.append(lst[idx])
return result
def format_bytes(bytes_size):
"""Convert bytes to human readable format"""
if bytes_size < 1024:
return f"{bytes_size} B"
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_size < 1024.0:
return f"{bytes_size:.1f} {unit}"
bytes_size /= 1024.0
return f"{bytes_size:.1f} PB"

View File

@@ -5,7 +5,7 @@ from dataclasses import dataclass, fields
from pathlib import Path
from typing import List
from pythonapp.Libs.ConfigDataClass import Config
from modules.shared.ConfigDataClass import Config
@dataclass
@@ -22,12 +22,15 @@ class PackageInfo(Config):
quantization: str = "" # fp8, bf16
dependencies: List[str] = None
resources: List[str] = None
tags: List[str] = None
def __post_init__(self):
if self.dependencies is None:
self.dependencies = []
if self.resources is None:
self.resources = []
if self.tags is None:
self.tags = []
super().__post_init__()
@@ -58,7 +61,7 @@ class ModelPackage:
field_value = getattr(package_info, field_name)
if field_value is not None and field_value != "" and field_name != "filename":
current_value = getattr(self.info, field_name)
if current_value is None or current_value == "" or current_value == 0 or len(current_value) == 0:
if current_value is None or current_value == "" or current_value == 0 or len(str(current_value)) == 0:
setattr(self.info, field_name, field_value)
# Генерируем UUID если он не определен
@@ -131,6 +134,15 @@ class ModelPackage:
resources.append(resource)
package_info.resources = resources
print("Теги (введите по одному, пустая строка для завершения):")
tags = []
while True:
tag = input().strip()
if not tag:
break
tags.append(resource)
package_info.tags = tags
# Генерируем UUID случайным образом (не запрашиваем у пользователя)
package_info.uuid = pkg_uuid
if not package_info.uuid:
@@ -206,6 +218,7 @@ class ModelPackage:
provides_list = self.info.resources.copy() # Возвращаем копию
if self.info.name: # Добавляем имя пакета, если оно есть
provides_list.append(self.info.name)
provides_list.extend(self.info.tags)
return provides_list
@classmethod

View File

@@ -0,0 +1,50 @@
from dataclasses import dataclass
from pathlib import Path
from modules.shared.ConfigDataClass import Config
@dataclass
class ModelPackageCollection(Config):
name: str = None
external_packages: list[str] = None
unsorted_packages: list[str] = None
categorized_packages: dict[str, list[str]] = None
def __post_init__(self):
if not self.external_packages: self.external_packages = list()
if not self.unsorted_packages: self.unsorted_packages = list()
if not self.categorized_packages: self.categorized_packages = dict()
super().__post_init__()
if not self.name: raise ValueError('ModelPackageCollection(): name must be specified')
def add_package(self, pkg_name, category: str = None, internal=True):
if not internal and pkg_name not in self.external_packages: self.external_packages.append(pkg_name)
elif not category and pkg_name not in self.unsorted_packages: self.unsorted_packages.append(pkg_name)
else:
if category not in self.categorized_packages: self.categorized_packages[category] = list()
if pkg_name not in self.categorized_packages[category]: self.categorized_packages[category].append(pkg_name)
self.save()
def get_path(self, pkg_name) -> Path:
if pkg_name in self.external_packages: return Path('')
elif pkg_name in self.unsorted_packages: return Path(self.name)
else:
for category in self.categorized_packages:
if pkg_name in self.categorized_packages[category]: return Path(self.name) / category
raise FileNotFoundError(f'package {pkg_name} not in collection {self.name}')
@property
def paths_dict(self) -> dict[str, Path]:
result: dict[str, Path] = dict()
for category in self.categorized_packages:
for pkg_name in list(self.categorized_packages[category]):
result[pkg_name] = Path(self.name) / category
for pkg_name in list(self.unsorted_packages): result[pkg_name] = Path(self.name)
for pkg_name in list(self.external_packages): result[pkg_name] = Path('')
return result

View File

@@ -1,8 +1,13 @@
import json
import os
import uuid
import warnings
from pathlib import Path
from modelspace.Essentials import SetsDict
from modelspace.ModelPackage import ModelPackage
from modelspace.Essentials import SetsDict, select_elements, format_bytes
from modelspace.ModelPackage import ModelPackage, PackageInfo
from modelspace.ModelPackageCollection import ModelPackageCollection
from modules.civit.client import Client
class ModelPackageSubRepository:
@@ -11,10 +16,11 @@ class ModelPackageSubRepository:
self.path = Path(path)
self.seed = seed
self.packages: dict[str, ModelPackage] | None = None
self.package_names: set[str] | None = None
self.resources: SetsDict | None = None
self.collections: dict[str, ModelPackageCollection] | None = None
self.reload()
# Completed
def _reload_packages(self):
self.packages = dict()
try:
@@ -27,7 +33,8 @@ class ModelPackageSubRepository:
package = ModelPackage.load(str(self.path / d))
self.packages[package.uuid] = package
# Completed
self.package_names = {p.name for id, p in self.packages.items()}
def _reload_resources(self):
self.resources = SetsDict()
@@ -35,12 +42,35 @@ class ModelPackageSubRepository:
for resource in package.provides:
self.resources.add(resource, pkg_id)
def _reload_collections(self):
try:
filenames = [item.name for item in self.path.iterdir() if item.is_file() and item.name.endswith('_collection.json')]
except OSError as e:
print(f"Ошибка доступа к директории: {e}")
return
self.collections = dict()
for filename in filenames:
collection = ModelPackageCollection(filename=str(Path(self.path) / filename))
self.collections[collection.name] = collection
def reload(self):
self._reload_packages()
self._reload_resources()
self._reload_collections()
def add_package_to_collection(self, pkg_name, collection_name, category: str = None, internal=True):
if pkg_name not in self.package_names:
if pkg_name in self.resources.keys or pkg_name in self.collections: raise RuntimeWarning('Only packages allowed to add in collections')
else: raise RuntimeWarning(f'Package {pkg_name} not found')
if collection_name not in self.collections:
self.collections[collection_name] = ModelPackageCollection(
self.path / (collection_name + '_collection.json'), name=collection_name, autosave=True
)
self.collections[collection_name].add_package(pkg_name, category, internal)
# debugged
def resources_from_pkg_list(self, uuids: list[str]):
selected_packages = []
for pkg_id in uuids:
@@ -60,7 +90,6 @@ class ModelPackageSubRepository:
for package in packages: res = res | set(package.dependencies)
return res
# debugged
def packages_by_resource(self, resource):
packages_ids = self.resources.by_key(resource)
@@ -73,7 +102,6 @@ class ModelPackageSubRepository:
for pkg_id in packages_ids: packages.add(self.package_by_id(pkg_id))
return packages
# debugged
def package_by_id(self, pkg_id):
package = self.packages.get(pkg_id, None)
if not package: raise RuntimeError(f"{pkg_id}: Something went wrong while reading package info")
@@ -92,4 +120,254 @@ class ModelPackageSubRepository:
package = ModelPackage.interactive(str(package_path), package_uuid)
loaded_package = ModelPackage.load(str(package_path))
self.packages[loaded_package.uuid] = loaded_package
return package
# Добавляем пакет в коллекции
self._add_package_to_collections_interactive(package)
return package
def _add_package_to_collections_interactive(self, package: ModelPackage):
while True:
print('Input collections, blank for stop')
collection = input().strip()
if collection == '': break
external = input('External? (blank for no): ').strip()
if external != '':
self.add_package_to_collection(package.name, collection, category=None, internal=False)
continue
category = input('Category: ').strip()
if category == '':
self.add_package_to_collection(package.name, collection, category=None, internal=True)
continue
else:
self.add_package_to_collection(package.name, collection, category, internal=True)
continue
def pull_civit_package(self, client: Client, model_id: int, version_id: int = None, file_id: int = None):
model_info = client.get_model_raw(model_id)
model_versions = model_info.get('modelVersions', None)
if not model_versions:
warnings.warn(f'Unable to find model {model_id}')
return
pull_candidates = list()
print('Model name:', model_info.get('name', None))
ic_package_type = model_info.get('type', None)
ic_tags = model_info.get('tags', None)
for model_version in model_versions:
if not model_version.get('availability', None) or model_version.get('availability', None) != 'Public': continue
ic_version_id = model_version.get('id', None)
ic_version = model_version.get('name', None)
ic_release_date = model_version.get('publishedAt', None)
ic_lineage = model_version.get('baseModel', None)
ic_images = None
images = model_version.get('images', None)
if images and isinstance(images, list): ic_images = [i.get('url', None) for i in images if i.get('url', None) is not None]
ic_provides = [f'civit-{model_id}-{ic_version_id}', f'civit-{model_id}'].copy()
for file in model_version.get('files', list()):
ic_size_bytes = file.get('sizeKB', None)
if ic_size_bytes and isinstance(ic_size_bytes, float):
ic_size_bytes = int(ic_size_bytes * 1024)
metadata = file.get('metadata', None)
ic_quantisation = None
if metadata:
ic_quantisation = metadata.get('fp', None)
ic_file_id = file.get('id', None)
ic_filename = file.get('name', None)
if file.get('type', None) and file.get('type', None) != 'Model':
continue
ic_url = file.get('downloadUrl', None)
ic_model_info = model_info.copy()
ic_name = f'civit-{model_id}-{ic_version_id}-{ic_file_id}'
ic_uuid = ic_name
pull_candidates.append({
'uuid': ic_uuid,
'name': ic_name,
'provides': ic_provides,
'version_id': ic_version_id,
'file_id': ic_file_id,
'package_type': ic_package_type.lower(),
'tags': ic_tags,
'version': ic_version,
'release_date': ic_release_date,
'lineage': ic_lineage,
'images': ic_images or list(),
'size_bytes': ic_size_bytes,
'quantisation': ic_quantisation or '',
'url': ic_url,
'filename': ic_filename,
'model_info': ic_model_info,
})
try:
del file, ic_url, ic_package_type, ic_images, ic_release_date, ic_tags, ic_filename, ic_lineage, model_info
del ic_model_info, ic_quantisation, ic_size_bytes, ic_version, images, metadata, model_version, model_versions
del ic_file_id, ic_version_id, ic_uuid, ic_name, ic_provides
except Exception:
pass
# check already pulled packages
already_pulled = list()
available_to_pull = list()
for candidate in pull_candidates:
if candidate['name'] in self.package_names: already_pulled.append(candidate)
else: available_to_pull.append(candidate)
if version_id: available_to_pull = [p for p in available_to_pull if p['version_id'] == version_id]
if file_id: available_to_pull = [p for p in available_to_pull if p['file_id'] == file_id]
if len(available_to_pull) == 0:
warnings.warn(f'Pull candidate not found for model_id:{model_id} and version_id:{version_id} and file_id:{file_id}')
return
# selection output
if len(already_pulled) > 0:
print('Already pulled packages:')
print(f' {'N':<{2}} {'version':<{10}} {'type':<{10}} {'release_date':<{25}}'
f' {'lineage':<{10}} {'quant':<{5}} {'size':<{10}} ')
for candidate in already_pulled:
print(
f' {'N':<{2}} {candidate['version']:<{10}} {candidate['package_type']:<{10}} {candidate['release_date']:<{25}}'
f' {candidate['lineage']:<{10}} {candidate['quantisation']:<{5}} {format_bytes(candidate['size_bytes']):<{10}} ')
if len(available_to_pull) == 0:
print('All available packages already pulled')
return
else:
print('Available packages:')
print(f' {'N':<{2}} {'version':<{10}} {'type':<{10}} {'release_date':<{25}}'
f' {'lineage':<{10}} {'quant':<{5}} {'size':<{10}} ')
for i in range(len(available_to_pull)):
candidate = available_to_pull[i]
quantisation = candidate['quantisation'] or 'N/A'
print(f' {i:<{2}} {candidate['version']:<{10}} {candidate['package_type']:<{10}} {candidate['release_date']:<{25}}'
f' {candidate['lineage']:<{10}} {quantisation:<{5}} {format_bytes(candidate['size_bytes']):<{10}} ')
if len(available_to_pull) > 1: to_pull = select_elements(pull_candidates, input("Your choice: "))
else: to_pull = available_to_pull
# Ввод зависимостей
print("Зависимости (введите по одной, п1658427устая строка для завершения):")
additional_dependencies = []
while True:
dep = input().strip()
if not dep:
break
additional_dependencies.append(dep)
# Ввод ресурсов
print("Ресурсы (введите по одному, пустая строка для завершения):")
additional_resources = []
while True:
resource = input().strip()
if not resource:
break
additional_resources.append(resource)
print("Теги (введите по одному, пустая строка для завершения):")
additional_tags = []
while True:
tag = input().strip()
if not tag:
break
additional_tags.append(resource)
while True:
print('One collection for all selected packages, blank for None')
collection = input().strip()
if collection == '': break
external = input('External? (blank for no): ').strip()
if external != '':
category: str | None = None
internal = False
break
category = str(input('Category: ')).strip()
if category == '':
category: str | None = None
internal = True
break
else:
internal = True
break
pulled: list[ModelPackage] = list()
for candidate in to_pull:
package_path = self.path / candidate['uuid']
if os.path.exists(str(Path(package_path) / "package.json")): raise RuntimeError("package exists!")
package_info = PackageInfo(str(Path(package_path) / "package.json"))
package_info.uuid = candidate['uuid']
package_info.name = candidate['name']
# TODO список дополнительных ресурсов, зависимостей, по линейке
# TODO add deps and resources based on lineage (use civit lineages)
package_info.resources = candidate['provides'].copy()
package_info.resources.extend(additional_resources)
package_info.dependencies = additional_dependencies.copy()
package_info.tags = candidate['tags'].copy()
package_info.tags.extend(additional_tags)
package_info.lineage = candidate['lineage'].lower()
# TODO cast package types (diffusion_model or checkpoint) (use civit lineages)
package_info.package_type = candidate['package_type'].lower()
package_info.version = candidate['version']
package_info.release_date = candidate['release_date']
package_info.size_bytes = candidate['size_bytes']
package_info.quantisation = candidate['quantisation']
package_info.save()
os.makedirs(package_path / 'files')
with open(package_path / 'model_info.json', 'w') as f:
json.dump(candidate['model_info'], f, indent=2, ensure_ascii=False)
print('Pulling model...')
client.download_file(url=candidate['url'], path=package_path / 'files' / candidate['filename'])
print('Pulling main thumbnail...')
preview = candidate['images'][0]
dir, file = str(preview).rsplit('/', maxsplit=1)
image_name, image_extension = str(file).rsplit('.', maxsplit=1)
ckpt = candidate['filename']
ckpt_name, ckpt_extension = str(ckpt).rsplit('.', maxsplit=1)
client.download_file(url=preview, path=package_path / 'files' / (ckpt_name + '.' + image_extension))
os.makedirs(package_path / 'images')
print('Pulling thumbnails...')
for image in candidate['images']:
dir, file = str(image).rsplit('/', maxsplit=1)
client.download_file(url=image, path=package_path / 'images' / file)
package = ModelPackage(package_path, [], package_info)
self.packages[package.uuid] = package
self.package_names.add(package.name)
self.add_package_to_collection(package.name, 'civit', internal=True)
if collection: self.add_package_to_collection(package.name, collection, category, internal=internal)
pulled.append(package)
for package in pulled:
info = package.info
print('Collections for package:')
print(f' {'N':<{2}} {'version':<{10}} {'type':<{10}} {'release_date':<{25}}'
f' {'lineage':<{10}} {'quant':<{5}} {'size':<{10}} ')
print(
f' {'N':<{2}} {info.version:<{10}} {info.package_type:<{10}} {info.release_date:<{25}}'
f' {info.lineage:<{10}} {info.quantization:<{5}} {format_bytes(info.size_bytes):<{10}} ')
self._add_package_to_collections_interactive(package)

View File

@@ -4,7 +4,7 @@ from pathlib import Path
from modelspace.ModelPackage import ModelPackage
from modelspace.ModelPackageSubRepository import ModelPackageSubRepository
from pythonapp.Libs.ConfigDataClass import Config
from modules.shared.ConfigDataClass import Config
@dataclass

0
modules/__init__.py Normal file
View File

76
modules/civit/Civit.py Normal file
View File

@@ -0,0 +1,76 @@
import datetime
import json
import os
from pathlib import Path
from modules.civit.client import Client
from modules.civit.fetch import Fetch
from modules.civit.datamodel import *
from modules.shared.DatabaseAbstraction import Database
class Civit:
def __init__(self, db: Database, path, api_key = None):
self._db = db
self.path = path
self.client = Client(path, api_key)
self.fetcher = Fetch(self.client)
Creator.create(self._db.cursor())
Tag.create(self._db.cursor())
Model.create(self._db.cursor())
def save(self, e: DataClassDatabase): return e.save(self._db.cursor())
def from_fetch(self, entity: str, entity_type: type[DataClassDatabase]):
if entity: entity = entity.lower()
else: return
if entity in self.fetcher.entities: subdir = self.fetcher.entities[entity]
else: raise ValueError(f'Civit doesn\'t have entity type {entity}')
directory_path = str(Path(self.client.path) / subdir)
files = os.listdir(directory_path)
i = 0
files_count = len(files)
tp = datetime.datetime.now()
# Проходим по всем файлам в директории
for filename in files:
i += 1
print(f'processing file {i} of {files_count} ({float(i) / float(files_count) * 100:.2f}%): {filename} Elapsed time {datetime.datetime.now() - tp}')
tp = datetime.datetime.now()
if not filename.endswith('.json'): continue
file_path = os.path.join(directory_path, filename)
data = None
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Если данные - список словарей
if isinstance(data, list):
pass
# Если данные - один словарь
elif isinstance(data, dict):
data = [data]
except (json.JSONDecodeError, IOError) as e:
print(f"Ошибка чтения файла {filename}: {e}")
continue
if not data: continue
t = datetime.datetime.now()
j = 0
data_count = len(data)
for d in data:
j += 1
self.save(entity_type.from_dict(d))
if j % 1000 == 0:
print(f'saved {j} {entity} of {data_count} ({float(j) / float(data_count) * 100:.2f}%). Elapsed time {datetime.datetime.now() - t}')
t = datetime.datetime.now()
del d, data

View File

176
modules/civit/client.py Normal file
View File

@@ -0,0 +1,176 @@
import os
from dataclasses import dataclass
from pathlib import Path
import requests
import time
from typing import Optional
from requests import Session
from modules.shared.ConfigDataClass import Config
@dataclass
class ClientConfig(Config):
api_key: str = ''
base_url: str = 'https://civitai.com/'
class Client:
def __init__(self, path, api_key: str = None):
self.path = path
os.makedirs(self.path, exist_ok=True)
self.config = ClientConfig(str(Path(self.path) / 'config.json'), autosave=True)
if self.config.api_key == '': self.config.api_key = api_key
self.config.save()
self._headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {self.config.api_key}'}
self.session = Session()
self.session.headers.update(self._headers)
pass
def enroll_key(self, key: str):
self.config.api_key = key
self.config.save()
@staticmethod
def build_query_string(params):
"""Build query string from dictionary of parameters
Args:
params (dict): Dictionary of parameters
Returns:
str: Query string in format '?param1=value1&param2=value2'
"""
if not params:
return ""
filtered_params = {k: v for k, v in params.items() if v is not None}
if not filtered_params:
return ""
query_parts = []
for key, value in filtered_params.items():
query_parts.append(f"{key}={value}")
return "?" + "&".join(query_parts)
def make_get_request(self, url: str, max_retries: int = 10, delay: float = 3.0,
timeout: int = 300, **kwargs) -> Optional[requests.Response]:
"""
Выполняет GET запрос с обработкой ошибок и повторными попытками
Args:
url (str): URL для запроса
max_retries (int): Максимальное количество попыток (по умолчанию 3)
delay (float): Задержка между попытками в секундах (по умолчанию 1.0)
timeout (int): Таймаут запроса в секундах (по умолчанию 30)
**kwargs: Дополнительные аргументы для requests.get()
Returns:
Optional[requests.Response]: Объект Response или None в случае ошибки
"""
session = self.session
for attempt in range(max_retries + 1):
try:
response = session.get(url, timeout=timeout, **kwargs)
response.raise_for_status() # Вызовет исключение для HTTP ошибок
return response
except (requests.exceptions.RequestException,
requests.exceptions.ConnectionError,
requests.exceptions.Timeout) as e:
if attempt == max_retries:
print(f"Не удалось выполнить запрос после {max_retries} попыток: {e}")
return None
print(f"Попытка {attempt + 1} не удалась: {e}. Повтор через {delay} секунд...")
time.sleep(delay)
return None
def get_creators_tags_raw(self, entity: str, page=None, limit = 200, query = None):
if not limit: limit = 200
if entity not in {'creators', 'tags'}: raise ValueError('Not in types')
response = self.make_get_request(
url = self.config.base_url + 'api/v1/' + entity + self.build_query_string(
{'page': page, 'limit': limit, 'query': query}
)
)
return response.json()
def get_creators_raw(self, page=None, limit = 200, query = None): return self.get_creators_tags_raw('creators', page, limit, query)
def get_tags_raw(self, page=None, limit = 200, query = None): return self.get_creators_tags_raw('tags', page, limit, query)
def get_model_raw(self, model_id: int):
try:
return self.make_get_request(f'{self.config.base_url}/api/v1/models/{model_id}').json()
except requests.exceptions.HTTPError as e:
print(e)
return {}
def download_file(self, url: str, path: str, chill_time: int = 3, max_retries: int = 3):
"""
Загружает файл по URL в указанный путь
Args:
url (str): URL файла для загрузки
path (str): Путь для сохранения файла
chill_time (int): Время ожидания в секундах при ошибке (по умолчанию 3)
max_retries (int): Максимальное количество попыток загрузки (по умолчанию 3)
"""
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
for attempt in range(max_retries):
try:
# Создаем запрос с прогресс-баром
response = self.session.get(url, stream=True, timeout=30)
response.raise_for_status()
# Получаем размер файла
total_size = int(response.headers.get('content-length', 0))
# Загружаем файл по частями
with open(path, 'wb') as file:
downloaded = 0
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
downloaded += len(chunk)
# Отображаем прогресс
if total_size > 0:
progress = (downloaded / total_size) * 100
print(f"\rDownloading: {progress:.1f}% ({downloaded}/{total_size} bytes)", end='',
flush=True)
print(f"\nFile downloaded successfully to {path}")
return True
except requests.exceptions.RequestException as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
print(f"Waiting {chill_time} seconds before retry...")
time.sleep(chill_time)
else:
print(f"Failed to download file after {max_retries} attempts")
return False
except IOError as e:
print(f"IO Error while saving file: {e}")
return False
except Exception as e:
print(f"Unexpected error: {e}")
return False
return False

233
modules/civit/datamodel.py Normal file
View File

@@ -0,0 +1,233 @@
from dataclasses import dataclass
from typing import Optional
from modules.shared.DataClassDatabase import DataClassDatabase
@dataclass
class ModelVersionFileHashes(DataClassDatabase):
# primitives
AutoV1: Optional[str] = None
AutoV2: Optional[str] = None
AutoV3: Optional[str] = None
CRC32: Optional[str] = None
SHA256: Optional[str] = None
BLAKE3: Optional[str] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {}
self._table_name = 'model_versions_files_hashes'
self._standalone_entity = False
@dataclass
class ModelVersionFileMetadata(DataClassDatabase):
# primitives
format: Optional[str] = None
size: Optional[str] = None
fp: Optional[str] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {}
self._table_name = 'model_versions_files_metadata'
self._standalone_entity = False
@dataclass
class ModelVersionFile(DataClassDatabase):
# primary key
id: Optional[int] = None
# primitives
sizeKB: Optional[float] = None
name: Optional[str] = None
type: Optional[str] = None
pickleScanResult: Optional[str] = None
pickleScanMessage: Optional[str] = None
virusScanResult: Optional[str] = None
virusScanMessage: Optional[str] = None
scannedAt: Optional[str] = None
downloadUrl: Optional[str] = None
primary: Optional[bool] = None
# child entities
metadata: Optional[ModelVersionFileMetadata] = None
hashes: Optional[ModelVersionFileHashes] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {
'metadata': ModelVersionFileMetadata,
'hashes': ModelVersionFileHashes,
}
self._key_field = 'id'
self._table_name = 'model_versions_files'
self._standalone_entity = False
@dataclass
class ModelVersionStats(DataClassDatabase):
downloadCount: Optional[int] = None
ratingCount: Optional[int] = None
rating: Optional[float] = None
thumbsUpCount: Optional[int] = None
thumbsDownCount: Optional[int] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {}
self._table_name = 'model_versions_stats'
self._standalone_entity = False
@dataclass
class ModelVersionImage(DataClassDatabase):
# primary key
id: Optional[int] = None
# primitives
url: Optional[str] = None
nsfwLevel: Optional[int] = None
width: Optional[int] = None
height: Optional[int] = None
hash: Optional[str] = None
type: Optional[str] = None
minor: Optional[bool] = None
poi: Optional[bool] = None
hasMeta: Optional[bool] = None
hasPositivePrompt: Optional[bool] = None
onSite: Optional[int] = None
remixOfId: Optional[int] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {}
self._key_field = 'id'
self._table_name = 'model_versions_images'
self._standalone_entity = False
@dataclass
class ModelVersion(DataClassDatabase):
# primary key
id: Optional[int] = None
# primitives
index: Optional[int] = None
name: Optional[str] = None
baseModel: Optional[str] = None
baseModelType: Optional[str] = None
publishedAt: Optional[str] = None
availability: Optional[str] = None
nsfwLevel: Optional[int] = None
description: Optional[str] = None
supportsGeneration: Optional[bool] = None
downloadUrl: Optional[str] = None
# list of primitives
trainedWords: Optional[list] = None
# child entities
stats: Optional[ModelVersionStats] = None
files: Optional[list[ModelVersionFile]] = None
images: Optional[list[ModelVersionImage]] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {
'stats': ModelVersionStats,
'files': ModelVersionFile,
'images': ModelVersionImage,
}
self._key_field = 'id'
self._table_name = 'model_versions'
self._standalone_entity = False
@dataclass
class ModelStats(DataClassDatabase):
# primitives
downloadCount: Optional[int] = None
favoriteCount: Optional[int] = None
thumbsUpCount: Optional[int] = None
thumbsDownCount: Optional[int] = None
commentCount: Optional[int] = None
ratingCount: Optional[int] = None
rating: Optional[int] = None
tippedAmountCount: Optional[int] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {}
self._table_name = 'model_stats'
self._standalone_entity = False
@dataclass
class ModelCreator(DataClassDatabase):
# primitives
username: Optional[str] = None
image: Optional[str] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {}
self._table_name = 'model_creators'
self._standalone_entity = False
@dataclass
class Model(DataClassDatabase):
# primary key
id: Optional[int] = None
# primitives
name: Optional[str] = None
description: Optional[str] = None
allowNoCredit: Optional[bool] = None
allowCommercialUse: Optional[list] = None
allowDerivatives: Optional[bool] = None
allowDifferentLicense: Optional[bool] = None
type: Optional[str] = None
minor: Optional[bool] = None
sfwOnly: Optional[bool] = None
poi: Optional[bool] = None
nsfw: Optional[bool] = None
nsfwLevel: Optional[int] = None
availability: Optional[str] = None
cosmetic: Optional[str] = None
supportsGeneration: Optional[bool] = None
mode: Optional[str] = None
# list of primitives
tags: Optional[list] = None
# child entities
stats: Optional[ModelStats] = None
creator: Optional[ModelCreator] = None
modelVersions: Optional[list[ModelVersion]] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {
'stats': ModelStats,
'creator': ModelCreator,
'modelVersions': ModelVersion,
}
self._key_field = 'id'
self._table_name = 'models'
self._standalone_entity = True
@dataclass
class Tag(DataClassDatabase):
name: Optional[str] = None
link: Optional[str] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {}
self._key_field = 'name'
self._table_name = 'tags'
self._standalone_entity = True
@dataclass
class Creator(DataClassDatabase):
# primary key
username: Optional[str] = None
# primitives
modelCount: Optional[int] = None
link: Optional[str] = None
image: Optional[str] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {}
self._key_field = 'username'
self._table_name = 'creators'
self._standalone_entity = True

617
modules/civit/fetch.py Normal file
View File

@@ -0,0 +1,617 @@
import datetime
import json
import time
import os
import warnings
from collections import defaultdict, Counter
from typing import Dict, List, Any, Tuple, Union
from pathlib import Path
from modules.civit.client import Client
class NetworkError(RuntimeError): pass
class ApiDataError(RuntimeError): pass
class CursorError(RuntimeError): pass
class EntityAnalyzer:
def __init__(self):
self.field_analysis = {}
def _get_json_files(self, directory_path: str) -> List[str]:
"""Получает список всех JSON файлов в директории"""
json_files = []
for filename in os.listdir(directory_path):
if filename.endswith('.json'):
json_files.append(os.path.join(directory_path, filename))
return json_files
def _load_json_data(self, file_path: str) -> List[Dict]:
"""Загружает данные из JSON файла"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list):
return data
else:
return [data]
except (json.JSONDecodeError, IOError) as e:
print(f"Ошибка чтения файла {file_path}: {e}")
return []
def _collect_all_entities(self, directory_path: str) -> List[Dict]:
"""Собирает все экземпляры из всех JSON файлов"""
all_entities = []
json_files = self._get_json_files(directory_path)
for file_path in json_files:
entities = self._load_json_data(file_path)
all_entities.extend(entities)
return all_entities
def _get_field_types(self, value: Any) -> str:
"""Определяет тип значения"""
if isinstance(value, dict):
return 'dict'
elif isinstance(value, list):
return 'list'
elif isinstance(value, bool):
return 'bool'
elif isinstance(value, int):
return 'int'
elif isinstance(value, float):
return 'float'
elif isinstance(value, str):
return 'str'
else:
return 'unknown'
def _get_main_type(self, types: List[str]) -> str:
"""Определяет основной тип из списка типов"""
if not types:
return 'unknown'
# Если есть dict или list - это сложная структура
if 'dict' in types or 'list' in types:
return 'complex'
# Иначе возвращаем первый тип (или объединяем)
unique_types = set(types)
if len(unique_types) == 1:
return types[0]
else:
return 'mixed'
def _is_hashable(self, value: Any) -> bool:
"""Проверяет, является ли значение хэшируемым"""
try:
hash(value)
return True
except TypeError:
return False
def _serialize_value_for_counter(self, value: Any) -> str:
"""Преобразует значение в строку для использования в Counter"""
if self._is_hashable(value):
return value
else:
# Для нехэшируемых типов используем строковое представление
return str(value)
def _analyze_fields_recursive(self, entity: Dict, parent_path: str,
field_types: Dict, field_presence: Dict,
field_values: Dict, top_n: int):
"""Рекурсивно анализирует поля сущности"""
if not isinstance(entity, dict):
return
for key, value in entity.items():
field_path = f"{parent_path}.{key}" if parent_path else key
# Добавляем тип поля
field_types[field_path].append(self._get_field_types(value))
# Отмечаем наличие поля
field_presence[field_path].append(True)
# Сохраняем значение для подсчета частоты (обрабатываем нехэшируемые типы)
if value is not None:
serialized_value = self._serialize_value_for_counter(value)
field_values[field_path].append(serialized_value)
# Рекурсивно анализируем вложенные структуры
if isinstance(value, dict):
self._analyze_fields_recursive(value, field_path, field_types,
field_presence, field_values, top_n)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
self._analyze_fields_recursive(item, field_path, field_types,
field_presence, field_values, top_n)
def _analyze_entity_structure(self, entities: List[Dict], top_n: int) -> Dict[str, Any]:
"""Анализирует структуру всех сущностей"""
if not entities:
return {}
# Собираем все поля и их типы
field_types = defaultdict(list)
field_presence = defaultdict(list)
field_values = defaultdict(list)
for entity in entities:
self._analyze_fields_recursive(entity, "", field_types, field_presence,
field_values, top_n)
# Формируем финальный анализ
result = {}
for field_path, types in field_types.items():
# Определяем основной тип
main_type = self._get_main_type(types)
# Подсчитываем частоту наличия поля
presence_count = len(field_presence[field_path])
total_count = len(entities)
always_present = presence_count == total_count
# Получаем топ N значений
top_values = []
if field_path in field_values:
try:
# Преобразуем строки обратно в оригинальные типы для отображения
value_counter = Counter(field_values[field_path])
top_values = [item[0] for item in value_counter.most_common(top_n)]
except Exception:
# Если возникла ошибка, используем пустой список
top_values = []
result[field_path] = {
'type': main_type,
'always_present': always_present,
'top_values': top_values,
'total_count': total_count,
'presence_count': presence_count
}
return result
def analyze_directory(self, directory_path: str, top_n: int = 10) -> Dict[str, Any]:
"""
Основной метод анализа директории
Args:
directory_path: Путь к директории с JSON файлами
top_n: Количество самых частых значений для каждого поля
Returns:
Словарь с анализом структуры данных
"""
# Шаг 1: Собираем все экземпляры из JSON файлов
entities = self._collect_all_entities(directory_path)
# Шаг 2: Анализируем структуру сущностей
self.field_analysis = self._analyze_entity_structure(entities, top_n)
return self.field_analysis
class Fetch:
def __init__(self, client: Client, delay = 3):
self.items: dict[int, dict] = dict()
self.cursor_state = 'normal'
self.client = client
self.path = client.path
self.delay_time = delay
@staticmethod
def load_json_dir(directory_path):
"""
Получает путь к директории, находит в ней все файлы json,
читает из них списки словарей и возвращает один список со всеми словарями
Args:
directory_path (str): Путь к директории с JSON файлами
Returns:
list: Список всех словарей из всех JSON файлов
"""
all_dicts = []
files = os.listdir(directory_path)
# Проходим по всем файлам в директории
for filename in files:
if filename.endswith('.json'):
file_path = os.path.join(directory_path, filename)
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Если данные - список словарей
if isinstance(data, list):
all_dicts.extend(data)
# Если данные - один словарь
elif isinstance(data, dict):
all_dicts.append(data)
except (json.JSONDecodeError, IOError) as e:
print(f"Ошибка чтения файла {filename}: {e}")
continue
return all_dicts
entities = {
'creator': 'fetch_creators',
'creators': 'fetch_creators',
'tag': 'fetch_tags',
'tags': 'fetch_tags',
'model': 'fetch_models',
'models': 'fetch_models',
'image': 'fetch_images',
'images': 'fetch_images',
}
keys = {
'creator': 'username',
'creators': 'username',
'tag': 'name',
'tags': 'name',
'model': 'id',
'models': 'id',
'image': 'id',
'images': 'id',
}
@classmethod
def load(cls, client: Client, entity_type: str):
if entity_type in cls.entities: subdir = cls.entities[entity_type]
else: raise ValueError(f'Civit doesn\'t have entity type {entity_type}')
res = cls.load_json_dir(str(Path(client.path) / subdir))
return res
@classmethod
def datamodel(cls, client: Client, subdir, top = None):
if not top: top = 10
path = Path(client.path) / subdir
datamodel = EntityAnalyzer().analyze_directory(path, top_n=top)
return datamodel
@classmethod
def _save_json(cls, path, items):
with open(path, 'w') as f:
json.dump(items, f, indent=2, ensure_ascii=False)
@classmethod
def _msg(cls, msg_type, arg1 = None, arg2 = None, arg3 = None, arg4 = None, arg5 = None, arg6 = None, arg7 = None):
if msg_type == 'initial': msg = f"Fetching {arg1}..."
elif msg_type == 'extend_warn': msg = f"Warning! This fetch iteration has no effect"
elif msg_type == 'paginated_progress': msg = f"{arg1}: Fetching page {arg2} of {arg3}"
elif msg_type == 'network_warn': msg = f"Network error! {arg1}"
elif msg_type == 'api_warn': msg = f"API data error! {arg1}"
elif msg_type == 'cursor_warn': msg = f"Cursor slip error! {arg1}"
else: return
print(msg)
def reset(self, entity):
self._msg('initial', entity)
if entity not in self.entities: raise RuntimeError(f'Unknown entity: {entity}')
self.path = Path(self.client.path) / self.entities[entity]
self.path.mkdir(exist_ok=True)
def extend(self, entity: str, items: list[dict] = None, save: str = None):
if entity not in self.keys: raise RuntimeError(f'Unknown entity: {entity}')
prev_len = len(self.items)
if items and len(items) > 0:
for item in items: self.items[item[self.keys[entity]]] = item
else: raise RuntimeError('Try extend with empty items')
post_len = len(self.items)
if prev_len == post_len:
self._msg('extend_warn')
raise RuntimeWarning('Warning! This fetch iteration has no effect')
if save: self._save_json(self.path / save, items)
return post_len - prev_len
def delay(self, mult = 1): time.sleep(self.delay_time * mult)
@classmethod
def crawler_paginated_parse_metadata(cls, page):
metadata = page.get('metadata', None)
if not metadata: raise RuntimeError("Unable to find metadata")
total_pages = metadata.get('totalPages', None)
current_page = metadata.get('currentPage', None)
if not total_pages or not current_page: RuntimeError("Unable to parse metadata")
print(f"Found! Total pages: {total_pages}")
return total_pages, current_page
def crawler_paginated(self, entity: str, save = True):
self.reset(entity)
url = self.client.config.base_url + 'api/v1/' + entity + f'?limit=200'
first_page = self.client.get_creators_tags_raw(entity)
self.extend(entity, first_page.get('items', None), save='page_1.json' if save else None)
total_pages, current_page = self.crawler_paginated_parse_metadata(first_page)
for i in range(2, total_pages + 1):
self.delay()
self._msg('paginated_progress', entity, i, total_pages)
page = self.client.get_creators_tags_raw(entity, page=i)
self.extend(entity, page.get('items', None), save=f'page_{i}.json' if save else None)
if save: self.to_file('all.json')
return self.to_list()
def to_list(self): return [value for key, value in self.items.items()]
def to_file(self, filename, dirname = None):
if not dirname: dirname = self.path
self._save_json(Path(dirname) / filename, self.to_list())
def request_get(self, url, json=True, timeout=10, **kwargs):
try:
response = self.client.session.get(url, timeout=timeout, **kwargs)
response.raise_for_status()
self.delay()
if json:
return response.json()
else:
return response
except Exception as e:
raise NetworkError from e
@classmethod
def crawler_cursor_parse_metadata(cls, page):
metadata = page.get('metadata', None)
if not metadata: raise RuntimeError("Unable to find metadata")
next_page = metadata.get('nextPage', None)
next_cursor = metadata.get('nextCursor', None)
if not next_page or not next_cursor: RuntimeError("Unable to parse metadata")
return next_page, next_cursor
@staticmethod
def cursor_strip(url):
split = url.split('cursor=', maxsplit=1)
if len(split) < 2: return url
prefix = split[0] + 'cursor='
suffix = split[1].split('%', maxsplit=1)[0]
return prefix + suffix
def cursor_decrement(self, url): raise NotImplemented
def cursor_increment(self, url): raise NotImplemented
def cursor_state_reset(self, cursor = None):
self.cursor_state = 'normal'
return cursor
def cursor_fix(self, url, increment = False):
if self.cursor_state == 'normal':
self.cursor_state = 'stripped'
return self.cursor_strip(url)
elif self.cursor_state == 'stripped':
return self.cursor_increment(url) if increment else self.cursor_decrement(url)
else: raise RuntimeWarning(f'Invalid cursor state: {self.cursor_state}')
def crawler_cursor_request(self, url, counter = 50, reset = True):
if reset: self.cursor_state_reset()
while counter < 0:
try:
page = self.request_get(url)
if not page.get('items', None) or len(page.get('items', None)) == 0: raise ApiDataError
try:
next_page, next_cursor = self.crawler_cursor_parse_metadata(page)
except RuntimeError as e:
return page
if next_page == url: raise CursorError
return page
except NetworkError as e:
self.delay(10)
self._msg('network_warn', str(e))
url = self.cursor_fix(url)
except ApiDataError as e:
self._msg('api_warn', str(e))
url = self.cursor_fix(url)
except CursorError as e:
self._msg('cursor_warn', str(e))
url = self.cursor_fix(url, increment=True)
counter -= 1
return dict() # TODO handle this error
@classmethod
def crawler_cursor_avoid_slip(cls, client: Client, url, path, entity, slip_retries = 5, get_retries = 50, chill_time = 3):
slip_counter = 0
get_counter = 0
page = None
while True:
try:
page = client.make_get_request(url)
if not page: raise ValueError
page = page.json()
if not page.get('items', None) or len(page.get('items', None)) == 0: raise ValueError
try: next_page, next_cursor = cls.crawler_cursor_parse_metadata(page)
except RuntimeError as e: return page
if next_page == url: raise TypeError
# raise ValueError
return page
except ValueError:
get_counter = get_counter + 1
with open(Path(path) / '_get_error.log', 'a') as file:
file.write(f'{url}\n')
if get_counter >= get_retries: return page
if entity == 'images':
print("Trying avoid images get error by decreasing cursor position by 1")
split = url.rsplit('=', maxsplit=1)
prefix = split[0] + '='
split = split[1].rsplit('%', maxsplit=1)
cursor = int(split[0])
cursor = cursor - 1
# suffix = '%' + split[1]
url = prefix + str(cursor) # + suffix
print('get error detected. waiting 30s for retry')
time.sleep(30)
except TypeError:
slip_counter = slip_counter + 1
with open(Path(path) / '_slip.log', 'a') as file:
file.write(f'{url}\n')
if slip_counter >= slip_retries: break
print('slip error detected. waiting 30s for retry')
time.sleep(30)
if entity not in {'models'}: raise RuntimeError("Slip detected! Avoiding failed: NotImplemented")
split = url.rsplit('.', 1)
prefix = split[0] + '.'
split = split[1].split('%', 1)
suffix = '%' + split[1]
num = int(split[0])
if num < 999:
num = num + 1
else:
raise RuntimeError("Slip avoiding failed: Number overflow")
url = prefix + f'{num:03d}' + suffix
page = client.make_get_request(url).json()
next_page, next_cursor = cls.crawler_paginated_parse_metadata(page)
if next_page != url: return page
else: raise RuntimeError("Slip avoiding failed: Not effective")
@classmethod
def crawler_cursor(cls, client: Client, entity: str, params: dict, save = True):
print(f"{datetime.datetime.now()} Fetching {entity}...")
path = Path(client.path) / ('fetch_' + entity)
items = dict()
url = f'{client.client.config.base_url}/api/v1/{entity}{client.client.build_query_string(params)}'
first_page = client.make_get_request(url)
if not first_page:
with open(Path(client.path) / 'bugs.log', 'a') as f: f.write(url + '\n')
return items
first_page = first_page.json()
if first_page.get('items', None):
for i in first_page.get('items', None): items[i['id']] = i
if save:
path.mkdir(exist_ok=True)
cls._save_json(path / 'first.json', [value for key, value in items.items()])
try: next_page, next_cursor = cls.crawler_cursor_parse_metadata(first_page)
except RuntimeError: return items
cc = 0
while next_page:
next_page = cls.cursor_strip(next_page)
time.sleep(3)
# with open(Path(client.path) / 'bugs.log', 'a') as f:
# f.write(next_page + '\n')
page = cls.crawler_cursor_avoid_slip(client, next_page, path, entity)
if not page: return items
page_items = page.get('items', None)
if page_items is None:
with open(Path(client.path)/'bugs.log', 'a') as f: f.write(next_page + '\n')
return items
l = len(items)
for i in page_items: items[i['id']] = i
print(f"{datetime.datetime.now()} Fetched {len(items) - l}/{len(page_items)} {entity} from page {next_page}")
if len(items) - l == 0 and cc < 5:
print("Trying avoid images cursor corruption by request for new cursor")
next_page = cls.cursor_strip(next_page)
cc += 1
continue
else: cc = 0
if save: cls._save_json(path / f'page_{next_cursor}.json', page_items)
try: next_page, next_cursor = cls.crawler_cursor_parse_metadata(page)
except RuntimeError: break
if save: cls._save_json(path / f'all.json', items)
return [value for key,value in items.items()]
@classmethod
def models(cls, client: Client, subdir='fetch_models', save=True):
return cls.crawler_cursor(client, 'models', {'period': 'AllTime', 'sort': 'Oldest', 'nsfw': 'true'}, save)
@classmethod
def images(cls, client: Client, subdir='fetch_images', save=True, start_with = None):
items = list()
if not start_with: start_with = 0
path = Path(client.path) / ('fetch_' + 'images')
if save: path.mkdir(exist_ok=True)
creators = [c.get('username', None) for c in cls.load(client, 'creators')]
counter = 1 + int(start_with)
c = ['nochekaiser881']
c.extend(creators[int(start_with):])
for username in creators[int(start_with):]:
# for username in ['yonne']:
time.sleep(3)
if not username: continue
page_items = cls.crawler_cursor(client, 'images', {
'period': 'AllTime', 'sort': 'Oldest', 'nsfw':'X', 'username': username, 'limit': '200', 'cursor': 0
}, save=False)
# page_items = cls._cursor_crawler(client, 'images', {
# 'period': 'AllTime', 'sort': 'Most%20Reactions', 'nsfw': 'X', 'username': username, 'limit': '200', 'cursor': 0
# }, save=False)
if len(page_items) >= 1000:
with open(path / '_1k.log', 'a') as f: f.write(username + '\n')
if len(page_items) >= 5000:
with open(path / '_5k.log', 'a') as f: f.write(username + '\n')
if len(page_items) >= 10000:
with open(path / '_10k.log', 'a') as f: f.write(username + '\n')
if len(page_items) >= 25000:
with open(path / '_25k.log', 'a') as f: f.write(username + '\n')
if len(page_items) >= 45000:
with open(path / '_giants_over_50k.log', 'a') as f: f.write(username + '\n')
print(f'Giant {username} has more then {len(page_items)} images, starting deep scan')
page_items_dict = dict()
for item in page_items: page_items_dict[item['id']] = item
print(f'Transferred {len(page_items_dict)} images of {len(page_items)}')
for sort in ['Newest', 'Most%20Reactions', 'Most%20Comments', 'Most%20Collected', ]:
page_items = cls.crawler_cursor(client, 'images',
{'period': 'AllTime', 'sort': sort, 'nsfw': 'X',
'username': username, 'limit': '200'}, save=False)
l = len(page_items_dict)
for item in page_items: page_items_dict[item['id']] = item
print(f'Added {len(page_items_dict) - l} images by {sort} sort crawl. {len(page_items_dict)} images total')
page_items = [value for key, value in page_items_dict.items()]
l = len(items)
#items.extend(page_items)
print(f"Fetched {len(page_items)} images by {username} ({counter}/{len(creators)})")
counter = counter + 1
if save: cls._save_json(path / f'{username}.json', page_items)
#if save: cls._save_json(path / 'aaa.json', items)
return items
def creators(self, save=True): return self.crawler_paginated('creators', save)
def tags(self, save=True): return self.crawler_paginated('tags', save)

464
modules/civit/neofetch.py Normal file
View File

@@ -0,0 +1,464 @@
import json
import os.path
import time
from dataclasses import dataclass
from typing import Any
import netifaces
from ping3 import ping
import requests
from requests import Response
from requests.exceptions import ConnectionError, Timeout, SSLError, ProxyError, RequestException
class NetworkError(Exception): pass
class ValidationError(Exception): pass
class CursorError(Exception): pass
class ContentTypeMismatchError(Exception): pass
class JsonParseError(Exception): pass
class EmptyDataError(Exception): pass
class EmptyItemsError(Exception): pass
class EmptyMetaWarning(Exception): pass
class CursorSlipError(Exception): pass
class DuplicateDataError(Exception): pass
class PoorDataWarning(Exception): pass
class CursorIncrementError(Exception): pass
@dataclass
class neofetch_error_counters:
network_timeout_error = 0
network_ssl_error = 0
network_proxy_error = 0
network_connection_error = 0
network_request_exception = 0
network_success = 0
@property
def network_errors(self): return (self.network_timeout_error + self.network_ssl_error + self.network_proxy_error +
self.network_connection_error + self.network_request_exception)
@property
def network_error_percentage(self): return float(self.network_errors) / float(self.network_success + self.network_errors) * 100 if self.network_success + self.network_errors != 0 else 0# %
def reset_network_stats(self):
self.network_timeout_error = 0
self.network_ssl_error = 0
self.network_proxy_error = 0
self.network_connection_error = 0
self.network_request_exception = 0
self.network_success = 0
http_unavailable = 0
http_other = 0
http_success = 0
@property
def http_errors(self): return self.http_unavailable + self.http_other
@property
def http_error_percentage(self): return float(self.http_errors) / float(self.http_success + self.http_errors) * 100 if self.http_success + self.http_errors != 0 else 0# %
def reset_http_stats(self):
self.http_unavailable = 0
self.http_other = 0
self.http_success = 0
json_type_mismatch = 0
json_parse_error = 0
json_success = 0
@property
def json_errors(self): return self.json_type_mismatch + self.json_parse_error
@property
def json_error_percentage(self): return float(self.json_errors) / float(self.json_success + self.json_errors) * 100 if self.json_success + self.json_errors != 0 else 0 # %
def reset_json_stats(self):
self.json_type_mismatch = 0
self.json_parse_error = 0
self.json_success = 0
class neofetch_collector:
def __init__(self, start_number = 0, autosave = False, save_path = '', autosave_chunk_size = 2000):
self.items: dict[str, dict] = dict()
self.pending_items: dict[str, dict] = dict()
self.autosave = autosave
if autosave and save_path == '': raise ValueError('autosave mode is enabled, but path is not specified')
self.save_path = save_path
self.current_number = start_number
self.autosave_chunk_size = autosave_chunk_size
def check_autosave(self):
if len(self.pending_items) < self.autosave_chunk_size: return 0
self.save()
def save(self, path = None, flush = False):
if not path: path = self.save_path
if len(self.pending_items) == 0: return 0
if self.autosave:
pending_items: list = [value for key, value in self.pending_items.items()]
self.pending_items = dict()
else:
pending_items: list = [value for key, value in self.items.items()]
path = os.path.join(self.save_path, f'{self.current_number}-{len(self.items)}.json')
with open(path, "w", encoding="utf-8") as f:
json.dump(pending_items, f, indent=4, ensure_ascii=False)
self.current_number = len(self.items)
if flush: self.flush()
return len(pending_items)
def flush(self):
if self.save_path != '': self.save()
self.items = dict()
self.pending_items = dict()
@staticmethod
def _cast_items(items: dict | list[dict] | set[dict], pk ='id') -> dict[str, dict]:
result: dict[str, dict] = dict()
if isinstance(items, list): pass
elif isinstance(items, dict): items = [items]
elif isinstance(items, set): items = list(items)
for item in items: result[str(item.get(pk, 'None'))] = item
return result
def _compare(self, items: dict[str, dict]) -> int: return len(set(items) - set(self.items))
def compare(self, items: dict | list[dict] | set[dict], pk ='id') -> int:
return len(set(self._cast_items(items, pk)) - set(self.items))
def add(self, items: dict | list[dict] | set[dict], pk ='id') -> int:
items = self._cast_items(items, pk)
new_items_count = self._compare(items)
new_items = set(items) - set(self.items)
self.items.update(items)
if self.autosave:
for key in new_items: self.pending_items[key] = items[key]
self.check_autosave()
return new_items_count
@property
def list(self): return [value for key, value in self.items.items()]
@property
def set(self):
return {value for key, value in self.items.items()}
class neofetch_cursor:
def __init__(self, url: str):
self.url = url
self.stripped = True
def update(self, next_page, stripped = False):
self.url = next_page
self.stripped = stripped
def strip(self):
split = self.url.split('cursor=', maxsplit=1)
if len(split) < 2: return self.url
prefix = split[0] + 'cursor='
suffix = split[1].split('%', maxsplit=1)[0]
self.url = prefix + suffix
return self.url
@staticmethod
def _order(number):
mask = 10
while number > mask: mask *= 10
return mask
def increment(self, count = 1):
split = self.url.split('cursor=', maxsplit=1)
if len(split) < 2: return self.url
prefix = split[0] + 'cursor='
split = split[1].rsplit('%', maxsplit=1)
if len(split) >= 2: suffix = '%' + split[1]
else: suffix = ''
split = split[0].rsplit('.', maxsplit=1)
if len(split) >= 2:
prefix += split[0] + '.'
cursor = split[1]
else: cursor = split[0]
cursor_order = len(cursor)
cursor = int(cursor)
incremented_cursor = cursor + count
if incremented_cursor < pow(10, cursor_order): cursor = incremented_cursor
else: raise CursorIncrementError(f'cursor has reached bounds: {pow(10, cursor_order)}')
self.url = f'{prefix}{cursor:03d}{suffix}'
return self.url
def decrement(self, count = 1): return self.increment(-count)
class neofetch:
def __init__(self, path, base_url = None, session=None):
self.path = path
self.base_url = base_url or 'https://civitai.com'
self.session = session
self.errors = neofetch_error_counters()
self.tags_collector = neofetch_collector(autosave=True, save_path=os.path.join(path, 'fetch', 'tags'))
self.creators_collector = neofetch_collector(autosave=True, save_path=os.path.join(path, 'fetch', 'creators'))
self.models_collector = neofetch_collector(autosave=True, save_path=os.path.join(path, 'fetch', 'models'))
self.images_collector = neofetch_collector(autosave=True, save_path=os.path.join(path, 'fetch', 'images'))
@staticmethod
def check_network(hostname = None) -> bool:
# check gateway
p = ping(netifaces.gateways()['default'][netifaces.AF_INET][0])
if p: print('[Network check/INFO] gateway is reachable')
else: print('[Network check/WARN] gateway unreachable or ping is not allowed')
# check wan
p = ping('1.1.1.1')
if p: print('[Network check/INFO] WAN is reachable')
else:
print('[Network check/ERR] WAN is unreachable')
return False
# check DNS
p = ping('google.com')
if p: print('[Network check/INFO] DNS is working')
else:
print('[Network check/ERR] DNS is unreachable')
return False
if not hostname:
print('[Network check/WARN] target not specified. skipping')
# check target
p = ping(hostname)
if p:
print('[Network check/INFO] site host is up')
else:
print('[Network check/ERR] site host is unreachable')
raise NetworkError('[Network check/ERR] site host is unreachable')
# check site working
try:
response = requests.get('https://' + hostname)
print('[Network check/INFO] site is responding to HTTP requests')
return True
except RequestException as e:
raise NetworkError from e
def wait_for_network(self, hostname):
while not self.check_network(hostname):
print('Waiting for network...')
time.sleep(30)
def network_garant(self, url, session=None, headers=None, retries = 10) -> requests.models.Response | None:
if retries <= 0: raise ValidationError("Network error correction failed")
exception_occurred = False
try:
if session: r = session.get(url)
elif headers: r = requests.get(url, headers=headers)
else: r: requests.models.Response = requests.get(url)
if not isinstance(r, requests.models.Response): raise ValidationError(
f'response has type {type(r)} but requests.models.Response is required'
)
return r
except Timeout as e:
# Таймаут соединения/чтения
print("Timeout:", e)
self.errors.network_timeout_error += 1
exception_occurred = True
except SSLError as e:
# Проблемы с TLSрукава
print("SSL error:", e)
self.errors.network_ssl_error += 1
exception_occurred = True
except ProxyError as e:
# Ошибка прокси (часто является под‑случаем ConnectionError, но отдельно)
print("Proxy error:", e)
self.errors.network_proxy_error += 1
exception_occurred = True
except ConnectionError as e:
# Ошибки соединения: DNS, unreachable host, RST, proxy fail и т.п.
print("Connection failed:", e)
self.errors.network_connection_error += 1
exception_occurred = True
except RequestException as e:
# Любая другая непредвиденная ошибка
print("General request error:", e)
self.errors.network_request_exception += 1
exception_occurred = True
finally:
if exception_occurred:
try: self.wait_for_network(str(url).split('//', maxsplit=1)[1].split('/', maxsplit=1)[0])
except Exception as e: self.wait_for_network(hostname=None)
return self.network_garant(url, session, headers, retries-1)
else: self.errors.network_success += 1
def http_garant(self, url, session=None, headers=None, retries = 10, service_available_retries = 720):
if retries <= 0: raise ValidationError("HTTP error correction failed")
try:
response = self.network_garant(url, session, headers)
response.raise_for_status()
self.errors.http_success += 1
return response
except requests.exceptions.HTTPError as e:
status = e.response.status_code
if status == 503:
self.errors.http_unavailable += 1
print("[http_garant/WARN] HTTP error, waiting availability:", e)
time.sleep(60)
return self.http_garant(url, session, headers, retries, service_available_retries - 1)
else:
self.errors.http_other += 1
print("[http_garant/ERR] HTTP error:", e)
time.sleep(10)
if service_available_retries <= 0: return self.http_garant(url, session, headers, retries - 1, service_available_retries)
else: raise CursorError from e
def json_garant(self, url, session=None, headers=None, retries = 10):
if retries <= 0: raise ValidationError("JSON parse error correction failed")
try:
response = self.http_garant(url, session, headers)
ct = response.headers.get("Content-Type", "")
if not ct.lower().startswith("application/json"): raise ContentTypeMismatchError
j = response.json()
self.errors.json_success += 1
return j
except ContentTypeMismatchError:
self.errors.json_type_mismatch += 1
print("[json_garant/ERR] HTTP error")
time.sleep(10)
return self.json_garant(url, session, headers, retries - 1)
except ValueError as e:
self.errors.json_parse_error += 1
print("[json_garant/ERR] HTTP error:", e)
time.sleep(10)
return self.json_garant(url, session, headers, retries - 1)
def api_data_garant(self, url, collector: neofetch_collector, session=None, headers=None, retries = 10, ):
if retries <= 0: raise ValidationError("API data error correction failed")
try:
response = self.json_garant(url, session, headers)
if 'items' not in response or 'metadata' not in response: raise EmptyDataError
items = response['items']
metadata = response['metadata']
del response
if len(items) == 0 and len(metadata) == 0: raise EmptyDataError
elif len(items) == 0: raise EmptyItemsError
elif len(metadata) == 0: raise EmptyMetaWarning
if 'nextPage' not in metadata: raise EmptyMetaWarning('Metadata has not nextPage field')
else: next_page = metadata['nextPage']
if 'totalPages' in metadata: total_pages = metadata['totalPages']
else: total_pages = None
if next_page and next_page == url: raise CursorSlipError
new_items_count = collector.compare(items)
new_items_percentage = float(new_items_count) / float(len(items)) * 100
if new_items_count == 0: raise DuplicateDataError
elif new_items_percentage < 50: raise PoorDataWarning
return items, next_page, total_pages
except EmptyDataError:
print('[api_data_garant/ERR] EmptyDataError: Empty api response')
time.sleep(10)
if retries > 1:
return self.api_data_garant(url, collector, session, headers, retries - 1)
else: raise CursorError
except EmptyItemsError:
print('[api_data_garant/ERR] EmptyItemsError: Empty api response')
time.sleep(10)
if retries > 1:
return self.api_data_garant(url, collector, session, headers, retries - 1)
else:
raise CursorError
except EmptyMetaWarning:
print('[api_data_garant/WARN] EmptyMetaWarning')
return items, None, None
except DuplicateDataError:
print('[api_data_garant/ERR] DuplicateDataError')
if retries > 1:
return self.api_data_garant(url, collector, session, headers, retries - 1)
else:
raise CursorSlipError
except PoorDataWarning:
print('[api_data_garant/WARN] PoorDataWarning')
return items, next_page, total_pages
def cursor_garant(self, cursor: neofetch_cursor, collector: neofetch_collector, session=None, headers=None, retries = 10):
if retries <= 0: raise ValidationError("Cursor error correction failed")
try:
return self.api_data_garant(cursor.url, collector, session, headers)
except CursorError:
print('[cursor_garant/ERR] CursorError')
if not cursor.stripped:
time.sleep(10)
cursor.strip()
return self.cursor_garant(cursor, collector, session, headers, retries - 1)
elif retries > 5: return self.cursor_garant(cursor, collector, session, headers, retries - 1)
else:
cursor.decrement(2)
return self.cursor_garant(cursor, collector, session, headers, retries - 1)
except CursorSlipError:
print('[cursor_garant/ERR] CursorSlipError')
if not cursor.stripped:
time.sleep(10)
cursor.strip()
return self.cursor_garant(cursor, collector, session, headers, retries - 1)
elif retries > 5: return self.cursor_garant(cursor, collector, session, headers, retries - 1)
else:
cursor.increment(1)
return self.cursor_garant(cursor, collector, session, headers, retries - 1)
def validation_garant(self, cursor: neofetch_cursor, collector: neofetch_collector, session=None, headers=None, retries = 10):
try: return self.cursor_garant(cursor, collector, session, headers)
except ValidationError as e:
# TODO log error
if retries > 0: return self.validation_garant(cursor, collector, session, headers, retries - 1)
else: raise RuntimeError from e
except CursorIncrementError as e: raise RuntimeError from e
def crawler(self, next_page, collector: neofetch_collector, session, type: str, start_number = 0):
cur = neofetch_cursor(next_page)
collector.current_number = start_number
total_pages = None
while next_page:
print(f'Fetching {type}: page {next_page}{f'of {total_pages}' if total_pages else ''}')
try: items, next_page, total_pages = self.validation_garant(cur, collector, session)
except RuntimeError:
# TODO log error
break
cur.update(next_page)
collector.add(items)
collector.save()
def tags(self, start_number=0):
return self.crawler(next_page=self.base_url + 'api/v1/tags?limit=200', collector=self.tags_collector,
session=self.session, type='tags', start_number=start_number)
def creators(self, start_number=0):
return self.crawler(next_page=self.base_url + 'api/v1/creators?limit=200', collector=self.creators_collector,
session=self.session, type='creators', start_number=start_number)
def models(self, start_number=0):
return self.crawler(next_page=self.base_url + 'api/v1/models?period=AllTime&sort=Oldest&nsfw=true&limit=200',
collector=self.models_collector, session=self.session, type='models', start_number=start_number)
if __name__ == '__main__':
n = neofetch_cursor('https://civitai.com/api/v1/models?period=AllTime&sort=Oldest&nsfw=true&cursor=2022-11-16%2023%3A31%3A28.203%7C162')
n.increment(10)
pass

View File

@@ -0,0 +1,238 @@
import datetime
from dataclasses import dataclass, fields
from typing import Optional, List, get_origin
from .DataClassJson import DataClassJson
from modules.shared.DatabaseAbstraction import Cursor
types = {bool: 'INTEGER', int: 'INTEGER', float: 'REAL', str: "TEXT",
Optional[bool]: 'INTEGER', Optional[int]: 'INTEGER', Optional[float]: 'REAL', Optional[str]: "TEXT", }
@dataclass
class DataClassDatabase(DataClassJson):
_standalone_entity: bool = None
_table_name: str = None
pass
def __post_init__(self):
super().__post_init__()
@classmethod
def get_create_sqls(cls, table_name = None):
tmp_instance = cls()
if not table_name: table_name = tmp_instance._table_name
pk_type = str
for field in fields(tmp_instance):
if field.name == tmp_instance._key_field:
pk_type = field.type
result: list[str] = list()
result.append(f'CREATE TABLE IF NOT EXISTS "{table_name}" (fk INTEGER NOT NULL, pk {types.get(pk_type, 'INTEGER')} NOT NULL, PRIMARY KEY(pk, fk));')
result.append(f'CREATE TABLE IF NOT EXISTS "{table_name}_archive" (fk INTEGER NOT NULL, pk {types.get(pk_type, 'INTEGER')} NOT NULL, save_date TEXT NOT NULL, PRIMARY KEY(pk, fk, save_date));')
excluded_fields = {f.name for f in fields(DataClassDatabase)}
all_fields = [f for f in fields(cls) if f.name not in excluded_fields and not f.name.startswith('_')]
for field in all_fields:
if field.name in tmp_instance._forwarding:
inner_type: type = tmp_instance._forwarding[field.name]
try: result.extend(inner_type.get_create_sqls())
except Exception as e: raise RuntimeError('invalid forwarding type') from e
elif field.type in { list, Optional[list], Optional[List] }:
result.append(f'CREATE TABLE IF NOT EXISTS "{table_name}_{field.name}" (fk TEXT NOT NULL, data TEXT NOT NULL, PRIMARY KEY(data, fk));')
else:
result.append(f'ALTER TABLE "{table_name}" ADD COLUMN "{field.name}" {types.get(field.type, 'TEXT')};')
result.append(f'ALTER TABLE "{table_name}_archive" ADD COLUMN "{field.name}" {types.get(field.type, 'TEXT')};')
return result
@classmethod
def create(cls, cur: Cursor):
for sql in cls.get_create_sqls():
try: cur.execute(sql)
except Exception as e: print(e)
@classmethod
def load(cls, cur: Cursor, pk=None, fk=None, depth = 5):
if not pk and not fk: return list()
params = list()
instance = cls()
sql = f'SELECT pk, fk FROM "{instance._table_name}"'
if pk or fk: sql += ' WHERE'
if pk:
params.append(pk)
sql += ' pk = ?'
if pk and fk: sql += ' AND'
if fk:
params.append(fk)
sql += ' fk = ?'
res: list[dict] = cur.fetchall(sql, params)
del pk, fk, sql, params
results = list()
for r in res:
item = cls._load(cur, r.get('pk', None), r.get('fk', None), depth)
if item: results.append(item)
return results
@classmethod
def _load(cls, cur: Cursor, pk, fk, depth = 5):
if not pk and not fk: return None
instance = cls()
res: dict = cur.fetchone(f'SELECT * FROM "{instance._table_name}" WHERE pk = ? AND fk = ?', [pk, fk])
if not res: return None
rpk = res.pop('pk')
rfk = res.pop('fk')
result = cls.from_dict(res)
if depth == 0: return result
for field in fields(cls):
print(field.name, field.type, get_origin(field.type))
if field.name in instance._forwarding:
items = instance._forwarding[field.name].load(cur, fk=rpk, depth=depth - 1)
if len(items) > 1: setattr(result, field.name, items) # TODO Убрать костыль
elif len(items) > 0: setattr(result, field.name, items[0])
elif field.type in {list, List, Optional[list], Optional[List]}:
items = cur.fetchall(f'SELECT data from "{instance._table_name}_{field.name}" WHERE fk=?', [rpk])
if items:
items = [row['data'] for row in items]
else:
items = list()
setattr(result, field.name, items)
return result
def save(self, cur: Cursor, fk = None):
if self._standalone_entity: fk = 0
elif not fk: raise RuntimeError('Trying to save child entity as standalone')
pk = self.key if self._key_field != 'key' else 0
prev = self._load(cur, pk=pk, fk=fk, depth=0)
if prev:
for field in self.serializable_fields():
setattr(self, field.name, getattr(self, field.name) or getattr(prev, field.name))
if prev and not self.equals_simple(prev):
d = str(datetime.datetime.now())
cur.execute(f'INSERT OR IGNORE INTO "{prev._table_name}_archive" (fk, pk, save_date) VALUES (?, ?, ?)', [fk, pk, d])
for field in prev.serializable_fields():
attr = getattr(prev, field.name)
if field.name in prev._forwarding: continue
elif field.type in {list, List, Optional[list], Optional[List]} or isinstance(attr, list): continue
else:
cur.execute(f'UPDATE "{prev._table_name}_archive" SET {field.name}=? WHERE fk=? AND pk=? AND save_date=?', [attr, fk, pk, d])
cur.execute(f'INSERT OR IGNORE INTO "{self._table_name}" (fk, pk) VALUES (?, ?)', [fk, pk])
for field in self.serializable_fields():
attr = getattr(self, field.name)
if not attr: continue
if field.name in self._forwarding:
if not isinstance(getattr(self, field.name), list): attr = [attr]
for val in attr:
val.autosave(cur, fk=pk)
continue
elif field.type in {list, List, Optional[list], Optional[List]} or isinstance(attr, list):
for val in attr: cur.execute(f'INSERT OR IGNORE INTO "{self._table_name}_{field.name}" VALUES (?, ?)', [pk, val])
continue
else:
cur.execute(f'UPDATE "{self._table_name}" SET "{field.name}"=? WHERE fk=? AND pk=?', [attr, fk, pk])
continue
def equals_simple(self, obj):
for field in self.serializable_fields():
if field.name in self._forwarding: continue
elif field.type in {list, List, Optional[list], Optional[List]}: continue
if getattr(self, field.name) != getattr(obj, field.name):
return False
return True
if __name__ == '__main__':
@dataclass
class ModelStats(DataClassDatabase):
downloadCount: Optional[int] = None
favoriteCount: Optional[int] = None
thumbsUpCount: Optional[int] = None
thumbsDownCount: Optional[int] = None
commentCount: Optional[int] = None
ratingCount: Optional[int] = None
rating: Optional[int] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {}
self._table_name = 'model_stats'
@dataclass
class Model(DataClassDatabase):
id: Optional[int] = None
name: Optional[str] = None
description: Optional[str] = None
allowNoCredit: Optional[bool] = None
allowCommercialUse: Optional[list] = None
allowDerivatives: Optional[bool] = None
allowDifferentLicense: Optional[bool] = None
type: Optional[str] = None
minor: Optional[bool] = None
sfwOnly: Optional[bool] = None
poi: Optional[bool] = None
nsfw: Optional[bool] = None
nsfwLevel: Optional[int] = None
availability: Optional[str] = None
cosmetic: Optional[str] = None
supportsGeneration: Optional[bool] = None
stats: Optional[ModelStats] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {
'stats': ModelStats,
}
self._key_field = 'id'
self._table_name = 'model'
self._standalone_entity = True
for s in Model.get_create_sqls():
print(s)
from modules.shared.DatabaseAbstraction import Database, Cursor
from modules.shared.DatabaseSqlite import SQLiteDatabase, SQLiteCursor
db = SQLiteDatabase('gagaga', '/tmp')
Model.create(db.cursor())
db.commit()
m = Model.load(db.cursor(), pk=42)
pdb = SQLiteDatabase('pidoras', '/tmp')
Model.create(pdb.cursor())
pdb.commit()
m0: Model = m[0]
m0.save(pdb.cursor())
pdb.commit()
m0.description = 'Abobus - avtobus'
m0.save(pdb.cursor())
pdb.commit()
pass

View File

@@ -0,0 +1,190 @@
from dataclasses import dataclass, field, fields
from typing import Dict, Any, Optional
import warnings
# Определим базовый класс для удобного наследования
@dataclass
class DataClassJson:
_forwarding: Dict[str, type] = field(default_factory=dict)
_key_field: str = 'key' # Поле, которое будет использоваться как ключ
fixed: bool = False
# Скрытые поля для хранения данных
_key: Optional[str] = None
other_data: Optional[Dict[str, Any]] = None
# Пример поля, которое будет использоваться в _forwarding
# Должно быть переопределено в дочерних классах
key: Optional[str] = None
def __post_init__(self):
if self._key is not None:
self.key = self._key
@property
def key(self) -> Optional[str]:
return self._key
@key.setter
def key(self, value: str):
self._key = value
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'DataClassJson':
# Создаем экземпляр класса
instance = cls()
instance.fixed = data.get('fixed', False)
instance.other_data = None
# Список всех полей
excluded_fields = {f.name for f in fields(DataClassJson)}
all_fields = {f.name for f in fields(cls) if f.name not in excluded_fields and not f.name.startswith('_')}
# Обрабатываем поля из _forwarding
handled_keys = set()
field_values = {}
for key, value in data.items():
if key in handled_keys:
continue
if key in instance._forwarding:
target_type = instance._forwarding[key]
if isinstance(value, dict):
# Обрабатываем словарь
sub_instance = target_type.from_dict(value)
field_values[key] = sub_instance
handled_keys.add(key)
elif isinstance(value, list):
# Обрабатываем список словарей
results = []
for item in value:
if isinstance(item, dict):
sub_instance = target_type.from_dict(item)
results.append(sub_instance)
else:
# Если элемент не словарь, записываем в other_data
warnings.warn(f"Non-dict value {item} in list for field '{key}' will be added to 'other_data'")
if instance.other_data is None:
instance.other_data = {}
instance.other_data[key] = item # Сохраняем оригинал
field_values[key] = results
handled_keys.add(key)
else:
# Если не словарь и не список, тоже добавляем в other_data
warnings.warn(f"Non-dict/list value {value} for field '{key}' will be added to 'other_data'")
if instance.other_data is None:
instance.other_data = {}
instance.other_data[key] = value
else:
# Обычное поле
if key in all_fields:
field_values[key] = value
handled_keys.add(key)
else:
# Неизвестное поле, добавляем в other_data
warnings.warn(f"Unknown field '{key}', adding to 'other_data'")
if instance.other_data is None:
instance.other_data = {}
instance.other_data[key] = value
# Заполняем обычные поля
for key, value in field_values.items():
setattr(instance, key, value)
# Устанавливаем ключ, если есть
if hasattr(instance, '_key_field') and instance._key_field in data:
instance.key = data[instance._key_field]
# Проверяем флаг fixed и other_data
if instance.fixed and instance.other_data is not None:
raise ValueError("Cannot serialize with fixed=True and non-empty other_data")
return instance
@classmethod
def serializable_fields(cls):
excluded_fields = {f.name for f in fields(DataClassJson)}
return {f for f in fields(cls) if f.name not in excluded_fields and not f.name.startswith('_')}
def to_dict(self) -> Dict[str, Any]:
result = {}
excluded_fields = {f.name for f in fields(DataClassJson)}
field_names = [f.name for f in fields(self) if f.name not in excluded_fields and not f.name.startswith('_')]
for field_name in field_names:
if not hasattr(self, field_name):
result[field_name] = None
warnings.warn(f'object not have field {field_name}, something went wrong')
continue
value = getattr(self, field_name)
if not value:
result[field_name] = None
warnings.warn(f'object not have data in field {field_name}, it may be correct situation')
continue
if field_name in self._forwarding:
target_type = self._forwarding[field_name]
result[field_name] = list()
single = False
if not isinstance(value, list):
single = True
value = [value]
for v in value:
try:
v = v.to_dict()
except Exception as e:
warnings.warn(str(e))
finally:
result[field_name].append(v)
if single: result[field_name] = result[field_name][0]
continue
else: result[field_name] = value
# Добавляем other_data, если есть
if self.other_data and isinstance(self.other_data, dict):
for key, value in self.other_data.items():
if key not in result:
result[key] = value
else:
if not isinstance(result[key], list): result[key] = [result[key]]
if not isinstance(value, list): value = [value]
result[key].extend(value)
return result
# Пример использования:
@dataclass
class Person(DataClassJson):
name: Optional[str] = None
age: Optional[int] = None
email: Optional[str] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {}
self._key_field = 'name'
@dataclass
class User(DataClassJson):
id: Optional[list] = None
username: Optional[str] = None
person: Optional[Person] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {'person': Person}
self._key_field = 'username'
# Пример десериализации:
if __name__ == "__main__":
data = {
"id": [1,2,3,4,5,6],
"username": "user1",
"person": None,
"extra_field": "should_be_in_other_data"
}
user = User.from_dict(data)
data2 = user.to_dict()
print(user.to_dict())

View File

@@ -0,0 +1,52 @@
class Cursor:
def __init__(self, cursor):
pass
def execute(self, sql: str, params: list = None) -> None:
pass
def fetchone(self, sql: str, params: list = None) -> dict:
pass
def fetchmany(self, sql: str = None, params: list = None) -> list[dict]:
pass
def fetchall(self, sql: str, params: list = None) -> list[dict]:
pass
def lastrowid(self):
pass
class Database:
def __init__(self, name: str):
self.name = name
self.connected = False
def commit(self):
pass
def cursor(self) -> Cursor:
pass
class DBContainer:
def __init__(self, db: Database):
self.db: Database = db
def switch_db(self, db: Database):
self.db.commit()
self.db: Database = db
@property
def connected(self) -> bool:
return self.db.connected
def commit(self):
self.db.commit()
def cursor(self) -> Cursor:
return self.db.cursor()

View File

@@ -0,0 +1,91 @@
from pathlib import Path
from .DatabaseAbstraction import Database, Cursor
import sqlite3 as sq
class SQLiteCursor(Cursor):
def __init__(self, cursor):
super().__init__(cursor)
self._cursor = cursor
def execute(self, sql: str, params: list = None) -> None:
"""Выполняет SQL запрос"""
if params is None:
self._cursor.execute(sql)
else:
self._cursor.execute(sql, params)
def fetchone(self, sql: str, params: list = None) -> dict:
"""Получает одну строку результата"""
if params is None:
self._cursor.execute(sql)
else:
self._cursor.execute(sql, params)
row = self._cursor.fetchone()
if row is None:
return None
# Преобразуем в словарь с именами колонок
columns = [description[0] for description in self._cursor.description]
return dict(zip(columns, row))
def fetchmany(self, sql: str = None, params: list = None) -> list[dict]:
"""Получает несколько строк результата"""
if sql is not None:
if params is None:
self._cursor.execute(sql)
else:
self._cursor.execute(sql, params)
rows = self._cursor.fetchmany()
if not rows:
return []
# Преобразуем в список словарей
columns = [description[0] for description in self._cursor.description]
return [dict(zip(columns, row)) for row in rows]
def fetchall(self, sql: str, params: list = None) -> list[dict]:
"""Получает все строки результата"""
if params is None:
self._cursor.execute(sql)
else:
self._cursor.execute(sql, params)
rows = self._cursor.fetchall()
if not rows:
return []
# Преобразуем в список словарей
columns = [description[0] for description in self._cursor.description]
return [dict(zip(columns, row)) for row in rows]
def lastrowid(self):
"""Возвращает ID последней вставленной строки"""
return self._cursor.lastrowid
class SQLiteDatabase(Database):
def __init__(self, name: str, path = '.'):
super().__init__(name)
self._connection: sq.Connection = sq.connect(Path(path) / (name + '.db'))
self._connection.autocommit = True
self._connection.row_factory = sq.Row # Для получения словарей
self.connected = True
def commit(self):
"""Фиксирует транзакцию"""
if self.connected and self._connection:
self._connection.commit()
def cursor(self) -> Cursor:
"""Создает и возвращает курсор"""
return SQLiteCursor(self._connection.cursor())
def close(self):
"""Закрывает соединение с базой данных"""
if self.connected and self._connection:
self._connection.close()
self.connected = False

View File

@@ -0,0 +1,249 @@
import datetime
import json
import time
import os
import warnings
from collections import defaultdict, Counter
from logging.config import valid_ident
from traceback import print_tb
from typing import Dict, List, Any, Tuple, Union
from pathlib import Path
from modules.civit.client import Client
class DatamodelBuilderSimple:
def __init__(self):
self.field_analysis = {}
self.field_analysis_low_ram: dict[str, int] = dict()
@staticmethod
def _get_json_files(directory_path: str) -> List[str]:
"""Получает список всех JSON файлов в директории"""
json_files = []
for filename in os.listdir(directory_path):
if filename.endswith('.json'):
json_files.append(os.path.join(directory_path, filename))
return json_files
@staticmethod
def _load_json_data(file_path: str) -> List[Dict]:
"""Загружает данные из JSON файла"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list):
return data
else:
return [data]
except (json.JSONDecodeError, IOError) as e:
print(f"Ошибка чтения файла {file_path}: {e}")
return []
def _collect_all_entities(self, directory_path: str) -> List[Dict]:
"""Собирает все экземпляры из всех JSON файлов"""
all_entities = []
json_files = self._get_json_files(directory_path)
for file_path in json_files:
entities = self._load_json_data(file_path)
all_entities.extend(entities)
return all_entities
@staticmethod
def _get_field_types(value: Any) -> str:
"""Определяет тип значения"""
if isinstance(value, dict):
return 'dict'
elif isinstance(value, list):
return 'list'
elif isinstance(value, bool):
return 'bool'
elif isinstance(value, int):
return 'int'
elif isinstance(value, float):
return 'float'
elif isinstance(value, str):
return 'str'
else:
return 'unknown'
@staticmethod
def _get_main_type(types: List[str]) -> str:
"""Определяет основной тип из списка типов"""
if not types:
return 'unknown'
# Если есть dict или list - это сложная структура
if 'dict' in types or 'list' in types:
return 'complex'
# Иначе возвращаем первый тип (или объединяем)
unique_types = set(types)
if len(unique_types) == 1:
return types[0]
else:
return 'mixed'
@staticmethod
def _is_hashable(value: Any) -> bool:
"""Проверяет, является ли значение хэшируемым"""
try:
hash(value)
return True
except TypeError:
return False
@classmethod
def _serialize_value_for_counter(cls, value: Any) -> str:
"""Преобразует значение в строку для использования в Counter"""
if cls._is_hashable(value):
return value
else:
# Для нехэшируемых типов используем строковое представление
return str(value)
def _analyze_fields_recursive(self, entity: Dict, parent_path: str,
field_types: Dict, field_presence: Dict,
field_values: Dict, top_n: int):
"""Рекурсивно анализирует поля сущности"""
if not isinstance(entity, dict):
return
for key, value in entity.items():
field_path = f"{parent_path}.{key}" if parent_path else key
# Добавляем тип поля
field_types[field_path].append(self._get_field_types(value))
# Отмечаем наличие поля
field_presence[field_path].append(True)
# Сохраняем значение для подсчета частоты (обрабатываем нехэшируемые типы)
if value is not None:
serialized_value = self._serialize_value_for_counter(value)
field_values[field_path].append(serialized_value)
# Рекурсивно анализируем вложенные структуры
if isinstance(value, dict):
self._analyze_fields_recursive(value, field_path, field_types,
field_presence, field_values, top_n)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
self._analyze_fields_recursive(item, field_path, field_types,
field_presence, field_values, top_n)
def _analyze_entity_structure(self, entities: List[Dict], top_n: int) -> Dict[str, Any]:
"""Анализирует структуру всех сущностей"""
if not entities:
return {}
# Собираем все поля и их типы
field_types = defaultdict(list)
field_presence = defaultdict(list)
field_values = defaultdict(list)
for entity in entities:
self._analyze_fields_recursive(entity, "", field_types, field_presence,
field_values, top_n)
# Формируем финальный анализ
result = {}
for field_path, types in field_types.items():
# Определяем основной тип
main_type = self._get_main_type(types)
# Подсчитываем частоту наличия поля
presence_count = len(field_presence[field_path])
total_count = len(entities)
always_present = presence_count == total_count
# Получаем топ N значений
top_values = []
if field_path in field_values:
try:
# Преобразуем строки обратно в оригинальные типы для отображения
value_counter = Counter(field_values[field_path])
top_values = [item[0] for item in value_counter.most_common(top_n)]
except Exception:
# Если возникла ошибка, используем пустой список
top_values = []
result[field_path] = {
'type': main_type,
'always_present': always_present,
'top_values': top_values,
'total_count': total_count,
'presence_count': presence_count
}
return result
def analyze_directory(self, directory_path: str, top_n: int = 10) -> Dict[str, Any]:
"""
Основной метод анализа директории
Args:
directory_path: Путь к директории с JSON файлами
top_n: Количество самых частых значений для каждого поля
Returns:
Словарь с анализом структуры данных
"""
# Шаг 1: Собираем все экземпляры из JSON файлов
entities = self._collect_all_entities(directory_path)
# Шаг 2: Анализируем структуру сущностей
self.field_analysis = self._analyze_entity_structure(entities, top_n)
return self.field_analysis
def analyze_directory_low_ram(self, directory_path: str, dump = None):
json_files = self._get_json_files(directory_path)
i = 0
files_count = len(json_files)
for file_path in json_files:
i += 1
print(f'processing file {i} of {files_count}: {file_path}')
entities = self._load_json_data(file_path)
for entity in entities:
self.analyze_recursive_low_ram(entity)
# del entity, entities
sorted_items = sorted(self.field_analysis_low_ram.items(), key=lambda item: item[1])
result = [f'{item[0]} => {item[1]}' for item in sorted_items]
if dump:
with open(dump, 'w') as f:
for res in result:
f.write(res + '\n')
for res in result:
print(res)
def analyze_recursive_low_ram(self, entity: dict, prefix = ''):
for key, value in entity.items():
if not isinstance(value, list): value = [value]
for v in value:
if isinstance(v, dict): self.analyze_recursive_low_ram(v, prefix=prefix + key + '.')
else: self.field_analysis_low_ram[prefix + key] = self.field_analysis_low_ram.get(prefix + key, 0) + 1
# del v
# del key, value
if __name__ == '__main__':
d = DatamodelBuilderSimple()
d.analyze_directory_low_ram(input("Directory path: "), input("Dump file path: "))

View File

@@ -0,0 +1,74 @@
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Deque, Tuple
WINDOWS = {
"5min": 5 * 60,
"1h": 60 * 60,
}
@dataclass
class IncrementalCounter:
"""Счётчик, который умеет:
• `+=` увеличивает внутренний счётчик на 1
• `last_5min`, `last_hour`, `total` сколько было увеличений
за последние 5минут, 1час и за всё время соответственно
"""
# Внутренний счётчик (сумма всех увеличений)
_total: int = 0
# История deque из timestamps (float) когда происходил инкремент
_history: Deque[float] = field(default_factory=deque, init=False)
# ---------- Оператор += ----------
def __iadd__(self, other):
"""
При любом `+=` увеличиваем счётчик на 1.
Возвращаем self, чтобы поддерживать цепочку выражений.
"""
# Счётчик всегда +1, игнорируем `other`
self._total += 1
# Храним только время события
self._history.append(time.monotonic())
# Удаляем слишком старые элементы (самый длинный интервал = 1h)
self._purge_old_entries()
return self
# ---------- Свойства для статистики ----------
@property
def total(self) -> int:
"""Общее количество прибавлений."""
return self._total
@property
def last_5min(self) -> int:
"""Сколько прибавлений было за последние 5 минут."""
return self._count_in_window(WINDOWS["5min"])
@property
def last_hour(self) -> int:
"""Сколько прибавлений было за последний час."""
return self._count_in_window(WINDOWS["1h"])
# ---------- Вспомогательные методы ----------
def _purge_old_entries(self) -> None:
"""Удаляем из deque все записи старше 1 часа."""
cutoff = time.monotonic() - WINDOWS["1h"]
while self._history and self._history[0] < cutoff:
self._history.popleft()
def _count_in_window(self, seconds: float) -> int:
"""Подсчёт, сколько событий попадает в заданный интервал."""
cutoff = time.monotonic() - seconds
# Удаляем старые элементы, которые уже не нужны
while self._history and self._history[0] < cutoff:
self._history.popleft()
return len(self._history)
# ---------- Пользовательский интерфейс ----------
def __repr__(self):
return (
f"<IncrementalCounter total={self.total} "
f"5min={self.last_5min} 1h={self.last_hour}>"
)

View File

@@ -0,0 +1,23 @@
class ListsDict:
def __init__(self):
self._data: dict[str, list] = dict()
def add(self, key, value):
if key not in self._data: self._data[key] = list()
if value not in self._data[key]: self._data[key].append(value)
def delete(self, key, value):
if self._data.get(key, None): self._data[key].remove(value)
@property
def index(self):
res = list()
for key, collection in self._data.items():
for elem in collection:
if elem not in res:
res.append(elem)
return res
def by_key(self, key):
return self._data.get(key, None)

View File

@@ -0,0 +1,25 @@
class SetsDict:
def __init__(self):
self._data: dict[str, set] = dict()
def add(self, key, value):
if key not in self._data: self._data[key] = set()
if value not in self._data[key]: self._data[key].add(value)
def delete(self, key, value):
if self._data.get(key, None): self._data[key].remove(value)
@property
def index(self):
res = set()
for key, collection in self._data.items():
for elem in collection:
if elem not in res:
res.add(elem)
return res
def by_key(self, key):
return self._data.get(key, None)
@property
def keys(self): return self._data.keys()

View File

View File

@@ -0,0 +1,11 @@
def format_bytes(bytes_size):
"""Convert bytes to human readable format"""
if bytes_size < 1024:
return f"{bytes_size} B"
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_size < 1024.0:
return f"{bytes_size:.1f} {unit}"
bytes_size /= 1024.0
return f"{bytes_size:.1f} PB"

View File

@@ -0,0 +1,44 @@
def select_elements(lst, selection_string):
"""
Выбирает элементы из списка согласно строке выбора
Args:
lst: Исходный список
selection_string: Строка вида "1 2 4-6 all"
Returns:
Новый список с выбранными элементами, отсортированными по номерам
"""
selection_string = selection_string.strip()
if not selection_string.strip():
return []
if selection_string == "all":
return lst.copy()
selected_indices = set()
parts = selection_string.split()
for part in parts:
if '-' in part:
# Обработка диапазона
start, end = map(int, part.split('-'))
# Обработка диапазона в любом направлении
if start <= end:
selected_indices.update(range(start, end + 1))
else:
selected_indices.update(range(start, end - 1, -1))
else:
# Обработка отдельного элемента
selected_indices.add(int(part))
# Преобразуем в список и сортируем по номерам
sorted_indices = sorted(selected_indices)
# Выбираем элементы
result = []
for idx in sorted_indices:
if 0 <= idx < len(lst):
result.append(lst[idx])
return result

0
modules/ui/__init__.py Normal file
View File

View File

@@ -0,0 +1,131 @@
import os.path
from pkgutil import extend_path
import flet as ft
from Workspaces import *
MainWindowDarkTheme = {
'logo_large_path': os.path.join('assets', 'logo_dark_large.png'),
'logo_small_path': os.path.join('assets', 'logo_dark_small.png'),
'background_color': "#1f1e23",
'text_color': "#ffffff",
'accent_color': "#7B1FA2",
'icon': ft.Icons.NIGHTLIGHT_ROUNDED
}
MainWindowLightTheme = {
'logo_large_path': os.path.join('assets', 'logo_light_large.png'),
'logo_small_path': os.path.join('assets', 'logo_light_small.png'),
'background_color': "#ffffff",
'text_color': "#1f1e23",
'accent_color': "#9C27B0",
'icon': ft.Icons.SUNNY
}
class MainWindow:
def __init__(self):
self.title = 'Vaiola'
self.themes = [MainWindowDarkTheme, MainWindowLightTheme]
self.active_theme_number = 0
self.active_theme = self.themes[self.active_theme_number]
self.side_bar = MainWindowSideBar(self)
self.active_workspace = self.side_bar.get_active_workspace()
self.page: ft.Page | None = None
def build(self, page: ft.Page):
self.page = page
page.clean()
page.title = self.title
page.padding = 0
page.bgcolor = self.active_theme['background_color']
self.active_workspace = self.side_bar.get_active_workspace()
layout = ft.Row(controls=[self.side_bar.build(), ft.VerticalDivider(thickness=4, ), self.active_workspace.build()],
spacing=0,
vertical_alignment=ft.CrossAxisAlignment.START,
alignment=ft.MainAxisAlignment.START,
)
page.add(layout)
def set_theme(self):
self.active_theme_number += 1
if self.active_theme_number >= len(self.themes): self.active_theme_number = 0
self.active_theme = self.themes[self.active_theme_number]
self.side_bar.set_theme(self.active_theme)
self.rebuild()
def rebuild(self):
self.build(self.page)
self.page.update()
class MainWindowSideBar:
logo_dark_large_path = os.path.join('assets', 'logo_dark_large.png')
logo_dark_small_path = os.path.join('assets', 'logo_dark_small.png')
logo_light_large_path = os.path.join('assets', 'logo_light_large.png')
logo_light_small_path = os.path.join('assets', 'logo_light_small.png')
def __init__(self, parent):
self.tabs: list[MainWindowSideBarTab] = list()
self.active_tab: MainWindowSideBarTab = MainWindowSideBarTab()
self.extended = True
self.theme = parent.active_theme
self.parent = parent
def set_theme(self, theme): self.theme = theme
def get_active_workspace(self) -> Workspace: return self.active_tab.get_active_workspace()
def build(self) -> ft.Container:
logo = ft.Container(
content=ft.Image(
src=self.theme['logo_large_path'] if self.extended else self.theme['logo_small_path'],
width=200 if self.extended else 60,
fit=ft.ImageFit.CONTAIN
),
width=200 if self.extended else 60,
# on_click
)
theme_button = ft.Button(
content=ft.Row(
controls=[
ft.Icon(self.theme['icon'], color=self.theme['background_color']),
ft.Text('Переключить тему', color=self.theme['background_color'])
] if self.extended else [
ft.Icon(self.theme['icon'], color=self.theme['background_color'])
]
),
bgcolor=self.theme['text_color'],
on_click=self.switch_theme
)
settings = ft.Container(content=theme_button, padding=8)
layout = ft.Column(
controls=[logo, ft.Text('Область вкладок', color=self.theme['text_color']), settings]
)
return ft.Container(content=layout, width=200 if self.extended else 60,)
def switch_theme(self, e):
self.parent.set_theme()
class MainWindowSideBarTab:
def __init__(self):
self.workspace = Workspace()
def get_active_workspace(self) -> Workspace:
return self.workspace
if __name__ == "__main__":
ft.app(target=MainWindow().build)

View File

@@ -0,0 +1,98 @@
# side_menu.py
import os.path
import flet as ft
class NavPanel:
"""Пустая панель навигации. Добавьте свои пункты позже."""
def __init__(self, page: ft.Page):
self.page = page
def build(self) -> ft.Container:
"""Возвращаем контейнер с кнопками навигации."""
# Пример пунктов замените/добавьте свои
return ft.Container(
padding=ft.padding.symmetric(vertical=10, horizontal=5),
content=ft.Column(
controls=[
ft.TextButton(
text="Главная",
icon=ft.Icons.HOME,
on_click=lambda e: self.page.views.append(ft.View("/home")),
style=ft.ButtonStyle(overlay_color=ft.Colors.GREY_200)
),
ft.TextButton(
text="Настройки",
icon=ft.Icons.SETTINGS,
on_click=lambda e: self.page.views.append(ft.View("/settings")),
style=ft.ButtonStyle(overlay_color=ft.Colors.GREY_200)
),
],
spacing=5,
),
)
class SideMenu:
"""
Класс, представляющий боковое меню.
На данный момент оно «пустое», но можно легко добавить пункты в будущем.
"""
def __init__(self):
# Любые начальные данные можно хранить здесь
self.width = 200 # ширина меню
self.bgcolor = ft.Colors.SURFACE
self.logo_path = os.path.join('assets', 'side_menu_logo_dark.png')
def build(self, page: ft.Page) -> ft.Container:
"""
Возвращает контейнер, который можно вставить в страницу.
"""
logo = ft.Image(
src=self.logo_path,
width=self.width, # растягиваем до ширины меню
fit=ft.ImageFit.CONTAIN, # сохраняем пропорции
# height может быть не задан; Flet будет авто‑подбирать
)
# 2⃣ Панель навигации
nav_panel = NavPanel(page).build()
# 3⃣ Кнопка‑тогглер темы
def toggle_theme(e):
# Переключаем режим и обновляем страницу
page.theme_mode = (
ft.ThemeMode.DARK if page.theme_mode == ft.ThemeMode.LIGHT
else ft.ThemeMode.LIGHT
)
page.update()
toggle_btn = ft.TextButton(
text="Тёмная тема" if page.theme_mode == ft.ThemeMode.LIGHT else "Светлая тема",
icon=ft.Icons.BOOKMARK,
on_click=toggle_theme,
style=ft.ButtonStyle(
padding=ft.padding.all(10),
alignment=ft.alignment.center_left
)
)
# 4⃣ Ставим всё в колонку с выравниванием по краям
return ft.Container(
width=self.width,
bgcolor=self.bgcolor,
padding=ft.padding.symmetric(vertical=15, horizontal=10),
content=ft.Column(
controls=[
logo,
ft.Divider(height=15),
nav_panel,
ft.Divider(height=15),
ft.Container(
content=toggle_btn,
alignment=ft.alignment.bottom_left
),
],
spacing=10,
alignment=ft.MainAxisAlignment.START,
),
)

242
modules/ui/gui/Sample.py Normal file
View File

@@ -0,0 +1,242 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Fletприложение: боковая панель + рабочая область
(с учётом всех уточнений: исчезновение текста при сворачивании,
акцент только на активной группе/вкладке, иконка‑кнопка смены темы,
фиолетовый акцентный цвет).
"""
import flet as ft
# ---------- Параметры изображений ----------
LOGO_LIGHT_COLLAPSED = "logo_light_small.png" # светлый логотип, «маленький»
LOGO_LIGHT_EXPANDED = "logo_light_large.png" # светлый логотип, «большой»
LOGO_DARK_COLLAPSED = "logo_dark_small.png" # тёмный логотип, «маленький»
LOGO_DARK_EXPANDED = "logo_dark_large.png" # тёмный логотип, «большой»
# ---------- Цвета ----------
LIGHT_BG = "#e0f7fa"
DARK_BG = "#263238"
LIGHT_ACC = "#9C27B0" # фиолетовый 500
DARK_ACC = "#7B1FA2" # фиолетовый 700
# ---------- Вспомогательные функции ----------
def status_lamp(color: str) -> ft.Icon:
return ft.Icon(ft.Icons.CIRCLE, size=12, color=color)
def group_icon(name: str) -> ft.Icon:
mapping = {
"Repository": ft.Icons.ARCHIVE,
"Environment": ft.Icons.FOLDER,
"Civit": ft.Icons.HEXAGON,
"Tasks": ft.Icons.DOWNLOAD,
}
return ft.Icon(mapping.get(name, ft.Icons.FOLDER), size=20)
def tab_icon() -> ft.Icon:
return ft.Icon(ft.Icons.FILE_COPY, size=30)
# ---------- Главная функция ----------
def main(page: ft.Page):
page.title = "Flet Sidebar Demo"
page.vertical_alignment = ft.MainAxisAlignment.START
page.horizontal_alignment = ft.CrossAxisAlignment.START
# ----- Состояния -----
is_expanded = True
group_expanded = {
"Repository": True,
"Environment": False,
"Civit": False,
"Tasks": False,
}
selected_group: str | None = None
selected_tab_name: str | None = None
selected_tab = ft.Text(value="Выберите вкладку", size=24, weight=ft.FontWeight.W_400)
# ----- Логотип -----
def get_logo_path() -> str:
if page.theme_mode == ft.ThemeMode.LIGHT:
return LOGO_LIGHT_EXPANDED if is_expanded else LOGO_LIGHT_COLLAPSED
else:
return LOGO_DARK_EXPANDED if is_expanded else LOGO_DARK_COLLAPSED
# ----- Панель навигации -----
sidebar = ft.Container(
width=200 if is_expanded else 60,
bgcolor=ft.Colors.SURFACE,
padding=ft.padding.all(8),
content=ft.Column(spacing=8, controls=[]),
)
# ----- Рабочая область -----
main_area = ft.Container(
expand=True,
padding=ft.padding.all(20),
content=ft.Container(
alignment=ft.alignment.center,
content=selected_tab
)
)
# ----- Макет -----
page.add(ft.Row(controls=[sidebar, main_area], expand=True))
# ----- Пересоздание боковой панели -----
def rebuild_sidebar():
"""Пересоздаёт содержимое боковой панели."""
controls = []
# 1. Логотип (с кликом)
logo_img = ft.Image(
src=get_logo_path(),
width=50 if not is_expanded else 150,
height=50 if not is_expanded else 150,
fit=ft.ImageFit.CONTAIN
)
logo_container = ft.Container(
content=logo_img,
on_click=lambda e: toggle_sidebar()
)
controls.append(logo_container)
# 2. Группы вкладок
groups = {
"Repository": ["Create", "Upload"],
"Environment": ["Create", "Upload", "Install"],
"Civit": ["Initialize", "Overview", "Selection"],
"Tasks": ["Upload"],
}
for grp_name, tabs in groups.items():
controls.append(build_group(grp_name, tabs))
# 3. Кнопка смены темы (только иконка)
controls.append(ft.Container(height=20))
theme_icon = ft.Icons.SUNNY if page.theme_mode == ft.ThemeMode.LIGHT else ft.Icons.NIGHTLIGHT_ROUNDED
theme_btn = ft.IconButton(
icon=theme_icon,
on_click=lambda e: toggle_theme()
)
controls.append(theme_btn)
sidebar.content.controls = controls
page.update()
# ----- Группа + подменю -----
def build_group(name: str, tabs: list[str]) -> ft.Container:
"""Создаёт одну группу с подменю."""
# Фон заголовка только для активной группы
header_bg = LIGHT_ACC if selected_group == name else ft.Colors.TRANSPARENT
# 1⃣ Первый ряд: статус‑лампочка, иконка, название группы
title_row = ft.Row(
controls=[
status_lamp("#ffeb3b"),
group_icon(name),
ft.Text(name, weight=ft.FontWeight.BOLD, color=ft.Colors.ON_PRIMARY)
],
alignment=ft.MainAxisAlignment.START,
vertical_alignment=ft.CrossAxisAlignment.CENTER,
spacing=8,
)
# 2⃣ Второй ряд: подстрока (отображается только при развёрнутой панели)
subtitle_row = ft.Row(
controls=[
ft.Text("Подстрока", size=10, color=ft.Colors.GREY)
],
alignment=ft.MainAxisAlignment.START,
vertical_alignment=ft.CrossAxisAlignment.CENTER,
spacing=8,
)
header_content = ft.Column(
controls=[title_row] + ([subtitle_row] if is_expanded else []),
spacing=2,
)
header = ft.Container(
padding=ft.padding.only(left=8, right=8, top=4, bottom=4),
bgcolor=header_bg,
border_radius=8,
content=header_content,
on_click=lambda e: toggle_group(name),
)
# Список вкладок
tab_items = []
for tab_name in tabs:
icon = tab_icon()
title = ft.Text(tab_name, color=ft.Colors.ON_SURFACE_VARIANT)
if selected_tab_name == tab_name:
icon.color = LIGHT_ACC if page.theme_mode == ft.ThemeMode.LIGHT else DARK_ACC
title.color = LIGHT_ACC if page.theme_mode == ft.ThemeMode.LIGHT else DARK_ACC
row = ft.Row(
controls=[icon],
alignment=ft.MainAxisAlignment.START,
vertical_alignment=ft.CrossAxisAlignment.CENTER,
spacing=8
)
if is_expanded:
row.controls.append(title)
item = ft.Container(
content=row,
padding=ft.padding.only(left=16),
on_click=lambda e, t=tab_name, g=name: select_tab(g, t)
)
tab_items.append(item)
sublist = ft.Container(
content=ft.Column(controls=tab_items, spacing=0),
height=0 if not group_expanded[name] else len(tabs) * 48,
)
return ft.Column(
controls=[header, sublist],
spacing=4
)
# ----- События -----
def toggle_sidebar():
nonlocal is_expanded
is_expanded = not is_expanded
sidebar.width = 200 if is_expanded else 80
rebuild_sidebar()
def toggle_group(name: str):
group_expanded[name] = not group_expanded[name]
rebuild_sidebar()
def select_tab(group: str, tab: str):
nonlocal selected_group, selected_tab_name
selected_group = group
selected_tab_name = tab
selected_tab.value = f"{tab}\n(Icon + text)"
rebuild_sidebar()
def toggle_theme():
if page.theme_mode == ft.ThemeMode.LIGHT:
page.theme_mode = ft.ThemeMode.DARK
page.bgcolor = DARK_BG
else:
page.theme_mode = ft.ThemeMode.LIGHT
page.bgcolor = LIGHT_BG
rebuild_sidebar()
page.update()
# ----- Инициализация -----
page.bgcolor = LIGHT_BG
rebuild_sidebar()
# ---------- Запуск ----------
if __name__ == "__main__":
ft.app(target=main)

View File

@@ -0,0 +1,7 @@
import flet as ft
class Workspace:
def __init__(self): pass
def build(self) -> ft.Container:
return ft.Container(content=ft.Text("Выберете вкладку"))

View File

View File

@@ -1,6 +1,6 @@
from dataclasses import dataclass
from pathlib import Path
from pythonapp.Libs.ConfigDataClass import Config
from modules.shared.ConfigDataClass import Config
class InstanceFileNaming:

View File

@@ -1 +1,5 @@
colorama
colorama
requests
netifaces
ping3
flet

View File

@@ -0,0 +1,142 @@
import json
import os.path
from modules.civit.Civit import Civit
from modules.civit.datamodel import Creator, Tag, Model
from modules.civit.fetch import Fetch
from modules.shared.DatabaseSqlite import SQLiteDatabase
from shell.Handlers.ABS import Handler
from modules.civit.client import Client
class CivitHandler(Handler):
def __init__(self):
super().__init__()
self.forwarding_table: dict[str, Handler] = {
'fetch': FetchHandler(self),
'save': SaveHandler(self)
}
self.handle_table: dict = {
'init': self._init,
}
self.client: Civit | None = None
def _init(self, command: list[str], pos=0):
keys, args = self.parse_arguments(command[pos:], ['path', 'key', 'dbpath'])
self._check_arg(keys, 'path')
if not keys['dbpath']: keys['dbpath'] = keys['path']
db = SQLiteDatabase(path=keys['dbpath'], name='data')
self.client = Civit(db, path=keys['path'], api_key=keys['key'])
self.succeed = True
class SaveHandler(Handler):
def __init__(self, parent):
super().__init__()
self.parent: CivitHandler = parent
self.forwarding_table: dict[str, Handler] = {
}
self.handle_table: dict = {
'creators': self._creators,
'tags': self._tags,
'models': self._models,
'images': self._images,
}
def _creators(self, command: list[str], pos=0):
self.parent.client.from_fetch('creator', Creator)
self.succeed = True
def _tags(self, command: list[str], pos=0):
self.parent.client.from_fetch('tag', Tag)
self.succeed = True
def _models(self, command: list[str], pos=0):
self.parent.client.from_fetch('model', Model)
self.succeed = True
def _images(self, command: list[str], pos=0): raise NotImplemented
class FetchHandler(Handler):
def __init__(self, parent):
super().__init__()
self.parent: CivitHandler = parent
self.forwarding_table: dict[str, Handler] = {
}
self.handle_table: dict = {
'creators_raw': self._creators_raw,
'creators': self._creators,
'tags_raw': self._tags_raw,
'tags': self._tags,
'models': self._models,
'images': self._images,
'datamodel': self._datamodel,
'load': self._load,
}
def _load(self, command: list[str], pos=0):
keys, args = self.parse_arguments(command[pos:], ['entity'])
self._check_arg(keys, 'entity')
res = Fetch.load(self.parent.client.client, keys['entity'])
for r in res: print(r)
self.succeed = True
def _creators_raw(self, command: list[str], pos=0):
keys, args = self.parse_arguments(command[pos:], ['page', 'limit', 'query'])
res = self.parent.client.client.get_creators_raw(page=keys['page'], limit=keys['limit'], query=keys['query'])
print(res)
self.succeed = True
def _creators(self, command: list[str], pos=0):
res = self.parent.client.fetcher.creators()
for r in res: print(r)
self.succeed = True
def _tags_raw(self, command: list[str], pos=0):
keys, args = self.parse_arguments(command[pos:], ['page', 'limit', 'query'])
res = self.parent.client.client.get_tags_raw(page=keys['page'], limit=keys['limit'], query=keys['query'])
print(res)
self.succeed = True
def _tags(self, command: list[str], pos=0):
res = self.parent.client.fetcher.tags()
for r in res: print(r)
self.succeed = True
def _models(self, command: list[str], pos=0):
res = Fetch.models(self.parent.client)
for r in res: print(r)
self.succeed = True
def _images(self, command: list[str], pos=0):
keys, args = self.parse_arguments(command[pos:], ['start'])
res = Fetch.images(self.parent.client.client, start_with=keys['start'])
for r in res: print(r)
self.succeed = True
def _datamodel(self, command: list[str], pos=0):
entities = {
'creator': 'fetch_creators',
'tag': 'fetch_tags',
'model': 'fetch_models',
'image': 'fetch_images',
}
keys, args = self.parse_arguments(command[pos:], ['entity', 'top', 'dump'])
self._check_arg(keys, 'entity')
if keys['entity'] in entities:
res = Fetch.datamodel(self.parent.client.client, entities[keys['entity']], keys['top'] or 10)
print(json.dumps(res, indent=2, ensure_ascii=False))
if keys['dump']:
with open(keys['dump'], 'w') as f:
json.dump(res, f, indent=2, ensure_ascii=False)
self.succeed = True

View File

@@ -1,4 +1,5 @@
from shell.Handlers.ABS import Handler
from shell.Handlers.CivitHandler import CivitHandler
from shell.Handlers.PythonappHandler import PythonappHandler
from shell.Handlers.ModelSpaceHandler import ModelSpaceHandler
@@ -9,6 +10,7 @@ class GlobalHandler(Handler):
self.forwarding_table: dict[str, Handler] = {
'pythonapp': PythonappHandler(),
'modelspace': ModelSpaceHandler(),
'civit': CivitHandler(),
}
self.handle_table: dict = {
'tell': self._tell

View File

@@ -1,5 +1,8 @@
from unicodedata import category
from modelspace.ModelPackageSelector import format_bytes
from modelspace.ModelSpace import ModelSpace
from modules.civit.client import Client
from shell.Handlers.ABS import Handler
from modelspace.Repository import global_repo
@@ -17,18 +20,56 @@ class ModelSpaceHandler(Handler):
'load': self._load,
'list': self._list,
'debug': self._debug,
'add-to-collection': self._add_to_collection,
'init-civit': self._init_civit,
'pull-civit': self._pull_civit,
# 'show': self._show,
# 'activate': self._activate,
}
self._loaded_instances: dict[str, ModelSpace] = {}
self._active_instance: ModelSpace | None = None
pass
self.client: Client | None = None
def _init_civit(self, command: list[str], pos=0):
keys, args = self.parse_arguments(command[pos:], ['path', 'key'])
self._check_arg(keys, 'path')
self.client = Client(keys['path'], keys['key'])
self.succeed = True
def _create_inter(self, command: list[str], pos=0):
global_repo.add_model_package_interactive()
self.succeed = True
def _add_to_collection(self, command: list[str], pos=0):
keys, args = self.parse_arguments(command[pos:], ['pkg', 'collection', 'category', 'ext'])
self._check_arg(keys, 'pkg')
self._check_arg(keys, 'collection')
if keys['ext']:
internal = False
category = None
else:
internal = True
category = keys['category']
global_repo.model_sub_repo.add_package_to_collection(keys['pkg'], keys['collection'], category, internal)
self.succeed = True
def _pull_civit(self, command: list[str], pos=0):
keys, args = self.parse_arguments(command[pos:], ['model', 'version', 'file'])
if keys['model']:
global_repo.model_sub_repo.pull_civit_package(self.client, keys['model'], keys['version'], keys['file'])
else:
while True:
model = input("Model ID:")
global_repo.model_sub_repo.pull_civit_package(self.client, model)
self.succeed = True
def _load(self, command: list[str], pos = 0):
keys, args = self.parse_arguments(command[pos:], ['path', 'layout', 'name'])
self._check_arg(keys, 'path')
@@ -56,6 +97,7 @@ class ModelSpaceHandler(Handler):
# def _create(self, command: list[str], pos = 0):
# keys, args = self.parse_arguments(command[pos:], ['env', 'path', 'python'])
self.succeed = True
def _install(self, command: list[str], pos = 0):
keys, args = self.parse_arguments(command[pos:], ['answer'])