diff --git a/modules/__init__.py b/modules/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/modules/civit/__init__.py b/modules/civit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/modules/civit/client.py b/modules/civit/client.py new file mode 100644 index 0000000..f26e2ef --- /dev/null +++ b/modules/civit/client.py @@ -0,0 +1,111 @@ +import os +from dataclasses import dataclass +from pathlib import Path + +import requests +import time +from typing import Optional +from requests import Session + +from modelspace.Repository import Repository +from pythonapp.Libs.ConfigDataClass import Config + +@dataclass +class ClientConfig(Config): + api_key: str = '' + base_url: str = 'https://civitai.com/' + + +class Client: + def __init__(self, path, api_key: str): + self.path = path + os.makedirs(self.path, exist_ok=True) + + self.config = ClientConfig(str(Path(self.path) / 'config.json'), autosave=True) + if self.config.api_key == '': self.config.api_key = api_key + self.config.save() + + self._headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {self.config.api_key}'} + self.session = Session() + self.session.headers.update(self._headers) + pass + + def enroll_key(self, key: str): + self.config.api_key = key + self.config.save() + + + @staticmethod + def build_query_string(params): + """Build query string from dictionary of parameters + + Args: + params (dict): Dictionary of parameters + + Returns: + str: Query string in format '?param1=value1¶m2=value2' + """ + if not params: + return "" + + filtered_params = {k: v for k, v in params.items() if v is not None} + + if not filtered_params: + return "" + + query_parts = [] + for key, value in filtered_params.items(): + query_parts.append(f"{key}={value}") + + return "?" + "&".join(query_parts) + + def make_get_request(self, url: str, max_retries: int = 10, delay: float = 3.0, + timeout: int = 300, **kwargs) -> Optional[requests.Response]: + """ + Выполняет GET запрос с обработкой ошибок и повторными попытками + + Args: + url (str): URL для запроса + max_retries (int): Максимальное количество попыток (по умолчанию 3) + delay (float): Задержка между попытками в секундах (по умолчанию 1.0) + timeout (int): Таймаут запроса в секундах (по умолчанию 30) + **kwargs: Дополнительные аргументы для requests.get() + + Returns: + Optional[requests.Response]: Объект Response или None в случае ошибки + """ + session = self.session + for attempt in range(max_retries + 1): + try: + response = session.get(url, timeout=timeout, **kwargs) + response.raise_for_status() # Вызовет исключение для HTTP ошибок + return response + + except (requests.exceptions.RequestException, + requests.exceptions.ConnectionError, + requests.exceptions.Timeout) as e: + + if attempt == max_retries: + print(f"Не удалось выполнить запрос после {max_retries} попыток: {e}") + return None + + print(f"Попытка {attempt + 1} не удалась: {e}. Повтор через {delay} секунд...") + time.sleep(delay) + + return None + + + def get_creators_tags_raw(self, entity: str, page=None, limit = 200, query = None): + if not limit: limit = 200 + if entity not in {'creators', 'tags'}: raise ValueError('Not in types') + response = self.make_get_request( + url = self.config.base_url + 'api/v1/' + entity + self.build_query_string( + {'page': page, 'limit': limit, 'query': query} + ) + ) + return response.json() + + def get_creators_raw(self, page=None, limit = 200, query = None): return self.get_creators_tags_raw('creators', page, limit, query) + def get_tags_raw(self, page=None, limit = 200, query = None): return self.get_creators_tags_raw('tags', page, limit, query) + + diff --git a/modules/civit/datamodel.py b/modules/civit/datamodel.py new file mode 100644 index 0000000..e69de29 diff --git a/modules/civit/datamodel_base.py b/modules/civit/datamodel_base.py new file mode 100644 index 0000000..3722a2c --- /dev/null +++ b/modules/civit/datamodel_base.py @@ -0,0 +1,185 @@ +from dataclasses import dataclass, field, fields +from typing import Dict, List, Any, Optional, get_type_hints +import warnings + +# Определим базовый класс для удобного наследования +@dataclass +class ForwardingBase: + _forwarding: Dict[str, type] = field(default_factory=dict) + _key_field: str = 'key' # Поле, которое будет использоваться как ключ + fixed: bool = False + + # Скрытые поля для хранения данных + _key: Optional[str] = None + other_data: Optional[Dict[str, Any]] = None + + # Пример поля, которое будет использоваться в _forwarding + # Должно быть переопределено в дочерних классах + key: Optional[str] = None + + def __post_init__(self): + if self._key is not None: + self.key = self._key + + @property + def key(self) -> Optional[str]: + return self._key + + @key.setter + def key(self, value: str): + self._key = value + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'ForwardingBase': + # Создаем экземпляр класса + instance = cls() + instance.fixed = data.get('fixed', False) + instance.other_data = None + + # Список всех полей + excluded_fields = {f.name for f in fields(ForwardingBase)} + all_fields = {f.name for f in fields(cls) if f.name not in excluded_fields and not f.name.startswith('_')} + + # Обрабатываем поля из _forwarding + handled_keys = set() + field_values = {} + + for key, value in data.items(): + if key in handled_keys: + continue + + if key in instance._forwarding: + target_type = instance._forwarding[key] + if isinstance(value, dict): + # Обрабатываем словарь + sub_instance = target_type.from_dict(value) + field_values[key] = sub_instance + handled_keys.add(key) + elif isinstance(value, list): + # Обрабатываем список словарей + results = [] + for item in value: + if isinstance(item, dict): + sub_instance = target_type.from_dict(item) + results.append(sub_instance) + else: + # Если элемент не словарь, записываем в other_data + warnings.warn(f"Non-dict value {item} in list for field '{key}' will be added to 'other_data'") + if instance.other_data is None: + instance.other_data = {} + instance.other_data[key] = item # Сохраняем оригинал + field_values[key] = results + handled_keys.add(key) + else: + # Если не словарь и не список, тоже добавляем в other_data + warnings.warn(f"Non-dict/list value {value} for field '{key}' will be added to 'other_data'") + if instance.other_data is None: + instance.other_data = {} + instance.other_data[key] = value + else: + # Обычное поле + if key in all_fields: + field_values[key] = value + handled_keys.add(key) + else: + # Неизвестное поле, добавляем в other_data + warnings.warn(f"Unknown field '{key}', adding to 'other_data'") + if instance.other_data is None: + instance.other_data = {} + instance.other_data[key] = value + + # Заполняем обычные поля + for key, value in field_values.items(): + setattr(instance, key, value) + + # Устанавливаем ключ, если есть + if hasattr(instance, '_key_field') and instance._key_field in data: + instance.key = data[instance._key_field] + + # Проверяем флаг fixed и other_data + if instance.fixed and instance.other_data is not None: + raise ValueError("Cannot serialize with fixed=True and non-empty other_data") + + return instance + + def to_dict(self) -> Dict[str, Any]: + result = {} + excluded_fields = {f.name for f in fields(ForwardingBase)} + field_names = [f.name for f in fields(self) if f.name not in excluded_fields and not f.name.startswith('_')] + + for field_name in field_names: + if not hasattr(self, field_name): + result[field_name] = None + warnings.warn(f'object not have field {field_name}, something went wrong') + continue + value = getattr(self, field_name) + if not value: + result[field_name] = None + warnings.warn(f'object not have data in field {field_name}, it may be correct situation') + continue + + if field_name in self._forwarding: + target_type = self._forwarding[field_name] + result[field_name] = list() + single = False + if not isinstance(value, list): + single = True + value = [value] + for v in value: + try: + v = v.to_dict() + except Exception as e: + warnings.warn(str(e)) + finally: + result[field_name].append(v) + if single: result[field_name] = result[field_name][0] + continue + else: result[field_name] = value + + # Добавляем other_data, если есть + if self.other_data and isinstance(self.other_data, dict): + for key, value in self.other_data.items(): + if key not in result: + result[key] = value + else: + if not isinstance(result[key], list): result[key] = [result[key]] + if not isinstance(value, list): value = [value] + result[key].extend(value) + + return result + +# Пример использования: +@dataclass +class Person(ForwardingBase): + name: Optional[str] = None + age: Optional[int] = None + email: Optional[str] = None + + def __post_init__(self): + super().__post_init__() + self._forwarding = {} + self._key_field = 'name' + +@dataclass +class User(ForwardingBase): + id: Optional[list] = None + username: Optional[str] = None + person: Optional[Person] = None + + def __post_init__(self): + super().__post_init__() + self._forwarding = {'person': Person} + self._key_field = 'username' + +# Пример десериализации: +if __name__ == "__main__": + data = { + "id": [1,2,3,4,5,6], + "username": "user1", + "person": None, + "extra_field": "should_be_in_other_data" + } + + user = User.from_dict(data) + data2 = user.to_dict() + print(user.to_dict()) \ No newline at end of file diff --git a/modules/civit/fetch.py b/modules/civit/fetch.py new file mode 100644 index 0000000..f77494b --- /dev/null +++ b/modules/civit/fetch.py @@ -0,0 +1,433 @@ +import datetime +import json +import time +import os +from collections import defaultdict, Counter +from typing import Dict, List, Any, Tuple, Union +from pathlib import Path + + +from modules.civit.client import Client + + +class EntityAnalyzer: + def __init__(self): + self.field_analysis = {} + + def _get_json_files(self, directory_path: str) -> List[str]: + """Получает список всех JSON файлов в директории""" + json_files = [] + for filename in os.listdir(directory_path): + if filename.endswith('.json'): + json_files.append(os.path.join(directory_path, filename)) + return json_files + + def _load_json_data(self, file_path: str) -> List[Dict]: + """Загружает данные из JSON файла""" + try: + with open(file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + if isinstance(data, list): + return data + else: + return [data] + except (json.JSONDecodeError, IOError) as e: + print(f"Ошибка чтения файла {file_path}: {e}") + return [] + + def _collect_all_entities(self, directory_path: str) -> List[Dict]: + """Собирает все экземпляры из всех JSON файлов""" + all_entities = [] + json_files = self._get_json_files(directory_path) + + for file_path in json_files: + entities = self._load_json_data(file_path) + all_entities.extend(entities) + + return all_entities + + def _get_field_types(self, value: Any) -> str: + """Определяет тип значения""" + if isinstance(value, dict): + return 'dict' + elif isinstance(value, list): + return 'list' + elif isinstance(value, bool): + return 'bool' + elif isinstance(value, int): + return 'int' + elif isinstance(value, float): + return 'float' + elif isinstance(value, str): + return 'str' + else: + return 'unknown' + + def _get_main_type(self, types: List[str]) -> str: + """Определяет основной тип из списка типов""" + if not types: + return 'unknown' + + # Если есть dict или list - это сложная структура + if 'dict' in types or 'list' in types: + return 'complex' + + # Иначе возвращаем первый тип (или объединяем) + unique_types = set(types) + if len(unique_types) == 1: + return types[0] + else: + return 'mixed' + + def _is_hashable(self, value: Any) -> bool: + """Проверяет, является ли значение хэшируемым""" + try: + hash(value) + return True + except TypeError: + return False + + def _serialize_value_for_counter(self, value: Any) -> str: + """Преобразует значение в строку для использования в Counter""" + if self._is_hashable(value): + return value + else: + # Для нехэшируемых типов используем строковое представление + return str(value) + + def _analyze_fields_recursive(self, entity: Dict, parent_path: str, + field_types: Dict, field_presence: Dict, + field_values: Dict, top_n: int): + """Рекурсивно анализирует поля сущности""" + if not isinstance(entity, dict): + return + + for key, value in entity.items(): + field_path = f"{parent_path}.{key}" if parent_path else key + + # Добавляем тип поля + field_types[field_path].append(self._get_field_types(value)) + + # Отмечаем наличие поля + field_presence[field_path].append(True) + + # Сохраняем значение для подсчета частоты (обрабатываем нехэшируемые типы) + if value is not None: + serialized_value = self._serialize_value_for_counter(value) + field_values[field_path].append(serialized_value) + + # Рекурсивно анализируем вложенные структуры + if isinstance(value, dict): + self._analyze_fields_recursive(value, field_path, field_types, + field_presence, field_values, top_n) + elif isinstance(value, list): + for item in value: + if isinstance(item, dict): + self._analyze_fields_recursive(item, field_path, field_types, + field_presence, field_values, top_n) + + def _analyze_entity_structure(self, entities: List[Dict], top_n: int) -> Dict[str, Any]: + """Анализирует структуру всех сущностей""" + if not entities: + return {} + + # Собираем все поля и их типы + field_types = defaultdict(list) + field_presence = defaultdict(list) + field_values = defaultdict(list) + + for entity in entities: + self._analyze_fields_recursive(entity, "", field_types, field_presence, + field_values, top_n) + + # Формируем финальный анализ + result = {} + for field_path, types in field_types.items(): + # Определяем основной тип + main_type = self._get_main_type(types) + + # Подсчитываем частоту наличия поля + presence_count = len(field_presence[field_path]) + total_count = len(entities) + always_present = presence_count == total_count + + # Получаем топ N значений + top_values = [] + if field_path in field_values: + try: + # Преобразуем строки обратно в оригинальные типы для отображения + value_counter = Counter(field_values[field_path]) + top_values = [item[0] for item in value_counter.most_common(top_n)] + except Exception: + # Если возникла ошибка, используем пустой список + top_values = [] + + result[field_path] = { + 'type': main_type, + 'always_present': always_present, + 'top_values': top_values, + 'total_count': total_count, + 'presence_count': presence_count + } + + return result + + def analyze_directory(self, directory_path: str, top_n: int = 10) -> Dict[str, Any]: + """ + Основной метод анализа директории + + Args: + directory_path: Путь к директории с JSON файлами + top_n: Количество самых частых значений для каждого поля + + Returns: + Словарь с анализом структуры данных + """ + # Шаг 1: Собираем все экземпляры из JSON файлов + entities = self._collect_all_entities(directory_path) + + # Шаг 2: Анализируем структуру сущностей + self.field_analysis = self._analyze_entity_structure(entities, top_n) + + return self.field_analysis + + + + +class Fetch: + + @staticmethod + def load_json_dir(directory_path): + """ + Получает путь к директории, находит в ней все файлы json, + читает из них списки словарей и возвращает один список со всеми словарями + + Args: + directory_path (str): Путь к директории с JSON файлами + + Returns: + list: Список всех словарей из всех JSON файлов + """ + all_dicts = [] + files = os.listdir(directory_path) + + # Проходим по всем файлам в директории + for filename in files: + if filename.endswith('.json'): + file_path = os.path.join(directory_path, filename) + + try: + with open(file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + + # Если данные - список словарей + if isinstance(data, list): + all_dicts.extend(data) + # Если данные - один словарь + elif isinstance(data, dict): + all_dicts.append(data) + + except (json.JSONDecodeError, IOError) as e: + print(f"Ошибка чтения файла {filename}: {e}") + continue + + return all_dicts + + entities = { + 'creator': 'fetch_creators', + 'creators': 'fetch_creators', + 'tag': 'fetch_tags', + 'tags': 'fetch_tags', + 'model': 'fetch_models', + 'models': 'fetch_models', + 'image': 'fetch_images', + 'images': 'fetch_images', + } + + @classmethod + def load(cls, client: Client, entity_type: str): + if entity_type in cls.entities: subdir = cls.entities[entity_type] + else: raise ValueError(f'Civit doesn\'t have entity type {entity_type}') + res = cls.load_json_dir(str(Path(client.path) / subdir)) + return res + + + @classmethod + def datamodel(cls, client: Client, subdir, top = None): + if not top: top = 10 + path = Path(client.path) / subdir + datamodel = EntityAnalyzer().analyze_directory(path, top_n=top) + return datamodel + + @classmethod + def _save_json(cls, path, items): + with open(path, 'w') as f: + json.dump(items, f, indent=2, ensure_ascii=False) + + @classmethod + def _paginated_crawler_parse_metadata(cls, page): + metadata = page.get('metadata', None) + if not metadata: raise RuntimeError("Unable to find metadata") + total_pages = metadata.get('totalPages', None) + current_page = metadata.get('currentPage', None) + if not total_pages or not current_page: RuntimeError("Unable to parse metadata") + print(f"Found! Total pages: {total_pages}") + return total_pages, current_page + + @classmethod + def _paginated_crawler(cls, client: Client, entity: str, save = True): + items = list() + print(f"Fetching {entity}...") + path = Path(client.path) / ('fetch_' + entity) + first_page = client.get_creators_tags_raw(entity) + if first_page.get('items', None): items.extend(first_page.get('items', None)) + if save: + path.mkdir(exist_ok=True) + cls._save_json(path / 'first.json', items) + total_pages, current_page = cls._paginated_crawler_parse_metadata(first_page) + for i in range(2, total_pages + 1): + print(f"Fetching page {i} of {total_pages}") + page = client.get_creators_tags_raw(entity, page=i) + time.sleep(3) + page_items = page.get('items', None) + if not page_items: continue + items.extend(page_items) + if save: cls._save_json(path / f'page_{i}.json', page_items) + + if save: cls._save_json(path / f'all.json', items) + return items + + @classmethod + def creators(cls, client: Client, subdir = 'fetch_creators', save = True): + return cls._paginated_crawler(client, 'creators', save) + + @classmethod + def tags(cls, client: Client, subdir='fetch_tags', save=True): + return cls._paginated_crawler(client, 'tags', save) + + @classmethod + def _cursor_crawler_parse_metadata(cls, page): + metadata = page.get('metadata', None) + if not metadata: raise RuntimeError("Unable to find metadata") + next_page = metadata.get('nextPage', None) + next_cursor = metadata.get('nextCursor', None) + if not next_page or not next_cursor: RuntimeError("Unable to parse metadata") + return next_page, next_cursor + + @classmethod + def _cursor_crawler_avoid_slip(cls, client: Client, url, path, entity, slip_retries = 5, chill_time = 3): + slip_counter = 0 + while True: + page = client.make_get_request(url).json() + try: next_page, next_cursor = cls._cursor_crawler_parse_metadata(page) + except RuntimeError: return page + if slip_counter >= slip_retries: break + if next_page == url: + slip_counter = slip_counter + 1 + with open(Path(path) / 'slip.log', 'a') as file: file.write(f'{url}\n') + continue + else: + return page + + if entity not in {'models'}: raise RuntimeError("Slip detected! Avoiding failed: NotImplemented") + + split = url.rsplit('.', 1) + prefix = split[0] + '.' + split = split[1].split('%', 1) + suffix = '%' + split[1] + num = int(split[0]) + if num < 999: + num = num + 1 + else: + raise RuntimeError("Slip avoiding failed: Number overflow") + url = prefix + f'{num:03d}' + suffix + page = client.make_get_request(url).json() + next_page, next_cursor = cls._paginated_crawler_parse_metadata(page) + if next_page != url: return page + else: raise RuntimeError("Slip avoiding failed: Not effective") + + + @classmethod + def _cursor_crawler(cls, client: Client, entity: str, params: dict, save = True): + print(f"Fetching {entity}...") + path = Path(client.path) / ('fetch_' + entity) + items = list() + first_page = client.make_get_request(url=f'{client.config.base_url}/api/v1/{entity}{client.build_query_string(params)}').json() + if first_page.get('items', None): items.extend(first_page.get('items', None)) + if save: + path.mkdir(exist_ok=True) + cls._save_json(path / 'first.json', items) + try: next_page, next_cursor = cls._cursor_crawler_parse_metadata(first_page) + except RuntimeError: return items + while next_page: + time.sleep(3) + page = cls._cursor_crawler_avoid_slip(client, next_page, path, entity) + page_items = page.get('items', None) + if not page_items: + with open(Path(client.path)/'bugs.log', 'a') as f: f.write(next_page + '\n') + return items + l = len(items) + items.extend(page_items) + print(f"Fetched {len(items) - l}/{len(page_items)} {entity} from page {next_page}") + + if save: cls._save_json(path / f'page_{next_cursor}.json', page_items) + try: next_page, next_cursor = cls._cursor_crawler_parse_metadata(page) + except RuntimeError: break + + if save: cls._save_json(path / f'all.json', items) + return items + + + + + + + + + + + @classmethod + def models(cls, client: Client, subdir='fetch_models', save=True): + return cls._cursor_crawler(client, 'models', {'period': 'AllTime', 'sort': 'Oldest', 'nsfw':'true'}, save) + + @classmethod + def images(cls, client: Client, subdir='fetch_images', save=True, start_with = None): + items = list() + if not start_with: start_with = 0 + path = Path(client.path) / ('fetch_' + 'images') + if save: path.mkdir(exist_ok=True) + creators = [c.get('username', None) for c in cls.load(client, 'creators')] + counter = 1 + int(start_with) + + for username in creators[int(start_with):]: + # for username in ['yonne']: + time.sleep(3) + if not username: continue + page_items = cls._cursor_crawler(client, 'images', {'period': 'AllTime', 'sort': 'Oldest', 'nsfw':'X', 'username': username, 'limit': '200'}, save=False) + + if len(page_items) >= 49000: + with open(path / '_giants.log', 'a') as f: f.write(username + '\n') + print(f'Giant {username} has more then {len(page_items)} images, starting deep scan') + page_items_dict = dict() + for item in page_items: page_items_dict[item['id']] = item + print(f'Transferred {len(page_items_dict)} images of {len(page_items)}') + for sort in ['Newest', 'Most Reactions', 'Most Comments', 'Most Collected']: + page_items = cls._cursor_crawler(client, 'images', + {'period': 'AllTime', 'sort': sort, 'nsfw': 'X', + 'username': username, 'limit': '200'}, save=False) + l = len(page_items_dict) + for item in page_items: page_items_dict[item['id']] = item + print(f'Added {len(page_items_dict) - l} images by {sort} sort crawl. {len(page_items_dict)} images total') + + page_items = [key for key, value in page_items_dict.items()] + + + l = len(items) + #items.extend(page_items) + print(f"Fetched {len(page_items)} images by {username} ({counter}/{len(creators)})") + counter = counter + 1 + + if save: cls._save_json(path / f'{username}.json', page_items) + + #if save: cls._save_json(path / 'aaa.json', items) + return items \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 3d90aaa..9f40479 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -colorama \ No newline at end of file +colorama +requests \ No newline at end of file diff --git a/shell/Handlers/CivitHandler.py b/shell/Handlers/CivitHandler.py new file mode 100644 index 0000000..a28cfc6 --- /dev/null +++ b/shell/Handlers/CivitHandler.py @@ -0,0 +1,111 @@ +import json + +from modules.civit.fetch import Fetch +from shell.Handlers.ABS import Handler +from modules.civit.client import Client + +class CivitHandler(Handler): + + + + def __init__(self): + super().__init__() + self.forwarding_table: dict[str, Handler] = { + 'fetch': FetchHandler(self) + } + self.handle_table: dict = { + 'init': self._init, + } + self.client: Client | None = None + + + def _init(self, command: list[str], pos=0): + keys, args = self.parse_arguments(command[pos:], ['path', 'key']) + self._check_arg(keys, 'path') + self.client = Client(keys['path'], keys['key']) + self.succeed = True + + +class FetchHandler(Handler): + + + + def __init__(self, parent): + super().__init__() + self.parent: CivitHandler = parent + self.forwarding_table: dict[str, Handler] = { + } + self.handle_table: dict = { + 'creators_raw': self._creators_raw, + 'creators': self._creators, + 'tags_raw': self._tags_raw, + 'tags': self._tags, + 'models': self._models, + 'images': self._images, + 'datamodel': self._datamodel, + 'load': self._load, + + } + + def _load(self, command: list[str], pos=0): + keys, args = self.parse_arguments(command[pos:], ['entity']) + self._check_arg(keys, 'entity') + + res = Fetch.load(self.parent.client, keys['entity']) + for r in res: print(r) + self.succeed = True + + + + def _creators_raw(self, command: list[str], pos=0): + keys, args = self.parse_arguments(command[pos:], ['page', 'limit', 'query']) + res = self.parent.client.get_creators_raw(page=keys['page'], limit=keys['limit'], query=keys['query']) + print(res) + self.succeed = True + + def _creators(self, command: list[str], pos=0): + res = Fetch.creators(self.parent.client) + for r in res: print(r) + self.succeed = True + + + def _tags_raw(self, command: list[str], pos=0): + keys, args = self.parse_arguments(command[pos:], ['page', 'limit', 'query']) + res = self.parent.client.get_tags_raw(page=keys['page'], limit=keys['limit'], query=keys['query']) + print(res) + self.succeed = True + + def _tags(self, command: list[str], pos=0): + res = Fetch.tags(self.parent.client) + for r in res: print(r) + self.succeed = True + + def _models(self, command: list[str], pos=0): + res = Fetch.models(self.parent.client) + for r in res: print(r) + self.succeed = True + + def _images(self, command: list[str], pos=0): + keys, args = self.parse_arguments(command[pos:], ['start']) + res = Fetch.images(self.parent.client, start_with=keys['start']) + for r in res: print(r) + self.succeed = True + + def _datamodel(self, command: list[str], pos=0): + entities = { + 'creator': 'fetch_creators', + 'tag': 'fetch_tags', + 'model': 'fetch_models', + 'image': 'fetch_images', + } + keys, args = self.parse_arguments(command[pos:], ['entity', 'top', 'dump']) + self._check_arg(keys, 'entity') + if keys['entity'] in entities: + res = Fetch.datamodel(self.parent.client, entities[keys['entity']], keys['top'] or 10) + print(json.dumps(res, indent=2, ensure_ascii=False)) + if keys['dump']: + with open(keys['dump'], 'w') as f: + json.dump(res, f, indent=2, ensure_ascii=False) + + + self.succeed = True diff --git a/shell/Handlers/GlobalHandler.py b/shell/Handlers/GlobalHandler.py index 4431c1d..13b8a3c 100644 --- a/shell/Handlers/GlobalHandler.py +++ b/shell/Handlers/GlobalHandler.py @@ -1,4 +1,5 @@ from shell.Handlers.ABS import Handler +from shell.Handlers.CivitHandler import CivitHandler from shell.Handlers.PythonappHandler import PythonappHandler from shell.Handlers.ModelSpaceHandler import ModelSpaceHandler @@ -9,6 +10,7 @@ class GlobalHandler(Handler): self.forwarding_table: dict[str, Handler] = { 'pythonapp': PythonappHandler(), 'modelspace': ModelSpaceHandler(), + 'civit': CivitHandler(), } self.handle_table: dict = { 'tell': self._tell