Add civit module

Add civit.fetch function
This commit is contained in:
2025-09-19 18:12:57 +07:00
parent 70e6e7a00b
commit 817283034a
9 changed files with 844 additions and 1 deletions

0
modules/__init__.py Normal file
View File

View File

111
modules/civit/client.py Normal file
View File

@@ -0,0 +1,111 @@
import os
from dataclasses import dataclass
from pathlib import Path
import requests
import time
from typing import Optional
from requests import Session
from modelspace.Repository import Repository
from pythonapp.Libs.ConfigDataClass import Config
@dataclass
class ClientConfig(Config):
api_key: str = ''
base_url: str = 'https://civitai.com/'
class Client:
def __init__(self, path, api_key: str):
self.path = path
os.makedirs(self.path, exist_ok=True)
self.config = ClientConfig(str(Path(self.path) / 'config.json'), autosave=True)
if self.config.api_key == '': self.config.api_key = api_key
self.config.save()
self._headers = {'Content-Type': 'application/json', 'Authorization': f'Bearer {self.config.api_key}'}
self.session = Session()
self.session.headers.update(self._headers)
pass
def enroll_key(self, key: str):
self.config.api_key = key
self.config.save()
@staticmethod
def build_query_string(params):
"""Build query string from dictionary of parameters
Args:
params (dict): Dictionary of parameters
Returns:
str: Query string in format '?param1=value1&param2=value2'
"""
if not params:
return ""
filtered_params = {k: v for k, v in params.items() if v is not None}
if not filtered_params:
return ""
query_parts = []
for key, value in filtered_params.items():
query_parts.append(f"{key}={value}")
return "?" + "&".join(query_parts)
def make_get_request(self, url: str, max_retries: int = 10, delay: float = 3.0,
timeout: int = 300, **kwargs) -> Optional[requests.Response]:
"""
Выполняет GET запрос с обработкой ошибок и повторными попытками
Args:
url (str): URL для запроса
max_retries (int): Максимальное количество попыток (по умолчанию 3)
delay (float): Задержка между попытками в секундах (по умолчанию 1.0)
timeout (int): Таймаут запроса в секундах (по умолчанию 30)
**kwargs: Дополнительные аргументы для requests.get()
Returns:
Optional[requests.Response]: Объект Response или None в случае ошибки
"""
session = self.session
for attempt in range(max_retries + 1):
try:
response = session.get(url, timeout=timeout, **kwargs)
response.raise_for_status() # Вызовет исключение для HTTP ошибок
return response
except (requests.exceptions.RequestException,
requests.exceptions.ConnectionError,
requests.exceptions.Timeout) as e:
if attempt == max_retries:
print(f"Не удалось выполнить запрос после {max_retries} попыток: {e}")
return None
print(f"Попытка {attempt + 1} не удалась: {e}. Повтор через {delay} секунд...")
time.sleep(delay)
return None
def get_creators_tags_raw(self, entity: str, page=None, limit = 200, query = None):
if not limit: limit = 200
if entity not in {'creators', 'tags'}: raise ValueError('Not in types')
response = self.make_get_request(
url = self.config.base_url + 'api/v1/' + entity + self.build_query_string(
{'page': page, 'limit': limit, 'query': query}
)
)
return response.json()
def get_creators_raw(self, page=None, limit = 200, query = None): return self.get_creators_tags_raw('creators', page, limit, query)
def get_tags_raw(self, page=None, limit = 200, query = None): return self.get_creators_tags_raw('tags', page, limit, query)

View File

View File

@@ -0,0 +1,185 @@
from dataclasses import dataclass, field, fields
from typing import Dict, List, Any, Optional, get_type_hints
import warnings
# Определим базовый класс для удобного наследования
@dataclass
class ForwardingBase:
_forwarding: Dict[str, type] = field(default_factory=dict)
_key_field: str = 'key' # Поле, которое будет использоваться как ключ
fixed: bool = False
# Скрытые поля для хранения данных
_key: Optional[str] = None
other_data: Optional[Dict[str, Any]] = None
# Пример поля, которое будет использоваться в _forwarding
# Должно быть переопределено в дочерних классах
key: Optional[str] = None
def __post_init__(self):
if self._key is not None:
self.key = self._key
@property
def key(self) -> Optional[str]:
return self._key
@key.setter
def key(self, value: str):
self._key = value
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ForwardingBase':
# Создаем экземпляр класса
instance = cls()
instance.fixed = data.get('fixed', False)
instance.other_data = None
# Список всех полей
excluded_fields = {f.name for f in fields(ForwardingBase)}
all_fields = {f.name for f in fields(cls) if f.name not in excluded_fields and not f.name.startswith('_')}
# Обрабатываем поля из _forwarding
handled_keys = set()
field_values = {}
for key, value in data.items():
if key in handled_keys:
continue
if key in instance._forwarding:
target_type = instance._forwarding[key]
if isinstance(value, dict):
# Обрабатываем словарь
sub_instance = target_type.from_dict(value)
field_values[key] = sub_instance
handled_keys.add(key)
elif isinstance(value, list):
# Обрабатываем список словарей
results = []
for item in value:
if isinstance(item, dict):
sub_instance = target_type.from_dict(item)
results.append(sub_instance)
else:
# Если элемент не словарь, записываем в other_data
warnings.warn(f"Non-dict value {item} in list for field '{key}' will be added to 'other_data'")
if instance.other_data is None:
instance.other_data = {}
instance.other_data[key] = item # Сохраняем оригинал
field_values[key] = results
handled_keys.add(key)
else:
# Если не словарь и не список, тоже добавляем в other_data
warnings.warn(f"Non-dict/list value {value} for field '{key}' will be added to 'other_data'")
if instance.other_data is None:
instance.other_data = {}
instance.other_data[key] = value
else:
# Обычное поле
if key in all_fields:
field_values[key] = value
handled_keys.add(key)
else:
# Неизвестное поле, добавляем в other_data
warnings.warn(f"Unknown field '{key}', adding to 'other_data'")
if instance.other_data is None:
instance.other_data = {}
instance.other_data[key] = value
# Заполняем обычные поля
for key, value in field_values.items():
setattr(instance, key, value)
# Устанавливаем ключ, если есть
if hasattr(instance, '_key_field') and instance._key_field in data:
instance.key = data[instance._key_field]
# Проверяем флаг fixed и other_data
if instance.fixed and instance.other_data is not None:
raise ValueError("Cannot serialize with fixed=True and non-empty other_data")
return instance
def to_dict(self) -> Dict[str, Any]:
result = {}
excluded_fields = {f.name for f in fields(ForwardingBase)}
field_names = [f.name for f in fields(self) if f.name not in excluded_fields and not f.name.startswith('_')]
for field_name in field_names:
if not hasattr(self, field_name):
result[field_name] = None
warnings.warn(f'object not have field {field_name}, something went wrong')
continue
value = getattr(self, field_name)
if not value:
result[field_name] = None
warnings.warn(f'object not have data in field {field_name}, it may be correct situation')
continue
if field_name in self._forwarding:
target_type = self._forwarding[field_name]
result[field_name] = list()
single = False
if not isinstance(value, list):
single = True
value = [value]
for v in value:
try:
v = v.to_dict()
except Exception as e:
warnings.warn(str(e))
finally:
result[field_name].append(v)
if single: result[field_name] = result[field_name][0]
continue
else: result[field_name] = value
# Добавляем other_data, если есть
if self.other_data and isinstance(self.other_data, dict):
for key, value in self.other_data.items():
if key not in result:
result[key] = value
else:
if not isinstance(result[key], list): result[key] = [result[key]]
if not isinstance(value, list): value = [value]
result[key].extend(value)
return result
# Пример использования:
@dataclass
class Person(ForwardingBase):
name: Optional[str] = None
age: Optional[int] = None
email: Optional[str] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {}
self._key_field = 'name'
@dataclass
class User(ForwardingBase):
id: Optional[list] = None
username: Optional[str] = None
person: Optional[Person] = None
def __post_init__(self):
super().__post_init__()
self._forwarding = {'person': Person}
self._key_field = 'username'
# Пример десериализации:
if __name__ == "__main__":
data = {
"id": [1,2,3,4,5,6],
"username": "user1",
"person": None,
"extra_field": "should_be_in_other_data"
}
user = User.from_dict(data)
data2 = user.to_dict()
print(user.to_dict())

433
modules/civit/fetch.py Normal file
View File

@@ -0,0 +1,433 @@
import datetime
import json
import time
import os
from collections import defaultdict, Counter
from typing import Dict, List, Any, Tuple, Union
from pathlib import Path
from modules.civit.client import Client
class EntityAnalyzer:
def __init__(self):
self.field_analysis = {}
def _get_json_files(self, directory_path: str) -> List[str]:
"""Получает список всех JSON файлов в директории"""
json_files = []
for filename in os.listdir(directory_path):
if filename.endswith('.json'):
json_files.append(os.path.join(directory_path, filename))
return json_files
def _load_json_data(self, file_path: str) -> List[Dict]:
"""Загружает данные из JSON файла"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, list):
return data
else:
return [data]
except (json.JSONDecodeError, IOError) as e:
print(f"Ошибка чтения файла {file_path}: {e}")
return []
def _collect_all_entities(self, directory_path: str) -> List[Dict]:
"""Собирает все экземпляры из всех JSON файлов"""
all_entities = []
json_files = self._get_json_files(directory_path)
for file_path in json_files:
entities = self._load_json_data(file_path)
all_entities.extend(entities)
return all_entities
def _get_field_types(self, value: Any) -> str:
"""Определяет тип значения"""
if isinstance(value, dict):
return 'dict'
elif isinstance(value, list):
return 'list'
elif isinstance(value, bool):
return 'bool'
elif isinstance(value, int):
return 'int'
elif isinstance(value, float):
return 'float'
elif isinstance(value, str):
return 'str'
else:
return 'unknown'
def _get_main_type(self, types: List[str]) -> str:
"""Определяет основной тип из списка типов"""
if not types:
return 'unknown'
# Если есть dict или list - это сложная структура
if 'dict' in types or 'list' in types:
return 'complex'
# Иначе возвращаем первый тип (или объединяем)
unique_types = set(types)
if len(unique_types) == 1:
return types[0]
else:
return 'mixed'
def _is_hashable(self, value: Any) -> bool:
"""Проверяет, является ли значение хэшируемым"""
try:
hash(value)
return True
except TypeError:
return False
def _serialize_value_for_counter(self, value: Any) -> str:
"""Преобразует значение в строку для использования в Counter"""
if self._is_hashable(value):
return value
else:
# Для нехэшируемых типов используем строковое представление
return str(value)
def _analyze_fields_recursive(self, entity: Dict, parent_path: str,
field_types: Dict, field_presence: Dict,
field_values: Dict, top_n: int):
"""Рекурсивно анализирует поля сущности"""
if not isinstance(entity, dict):
return
for key, value in entity.items():
field_path = f"{parent_path}.{key}" if parent_path else key
# Добавляем тип поля
field_types[field_path].append(self._get_field_types(value))
# Отмечаем наличие поля
field_presence[field_path].append(True)
# Сохраняем значение для подсчета частоты (обрабатываем нехэшируемые типы)
if value is not None:
serialized_value = self._serialize_value_for_counter(value)
field_values[field_path].append(serialized_value)
# Рекурсивно анализируем вложенные структуры
if isinstance(value, dict):
self._analyze_fields_recursive(value, field_path, field_types,
field_presence, field_values, top_n)
elif isinstance(value, list):
for item in value:
if isinstance(item, dict):
self._analyze_fields_recursive(item, field_path, field_types,
field_presence, field_values, top_n)
def _analyze_entity_structure(self, entities: List[Dict], top_n: int) -> Dict[str, Any]:
"""Анализирует структуру всех сущностей"""
if not entities:
return {}
# Собираем все поля и их типы
field_types = defaultdict(list)
field_presence = defaultdict(list)
field_values = defaultdict(list)
for entity in entities:
self._analyze_fields_recursive(entity, "", field_types, field_presence,
field_values, top_n)
# Формируем финальный анализ
result = {}
for field_path, types in field_types.items():
# Определяем основной тип
main_type = self._get_main_type(types)
# Подсчитываем частоту наличия поля
presence_count = len(field_presence[field_path])
total_count = len(entities)
always_present = presence_count == total_count
# Получаем топ N значений
top_values = []
if field_path in field_values:
try:
# Преобразуем строки обратно в оригинальные типы для отображения
value_counter = Counter(field_values[field_path])
top_values = [item[0] for item in value_counter.most_common(top_n)]
except Exception:
# Если возникла ошибка, используем пустой список
top_values = []
result[field_path] = {
'type': main_type,
'always_present': always_present,
'top_values': top_values,
'total_count': total_count,
'presence_count': presence_count
}
return result
def analyze_directory(self, directory_path: str, top_n: int = 10) -> Dict[str, Any]:
"""
Основной метод анализа директории
Args:
directory_path: Путь к директории с JSON файлами
top_n: Количество самых частых значений для каждого поля
Returns:
Словарь с анализом структуры данных
"""
# Шаг 1: Собираем все экземпляры из JSON файлов
entities = self._collect_all_entities(directory_path)
# Шаг 2: Анализируем структуру сущностей
self.field_analysis = self._analyze_entity_structure(entities, top_n)
return self.field_analysis
class Fetch:
@staticmethod
def load_json_dir(directory_path):
"""
Получает путь к директории, находит в ней все файлы json,
читает из них списки словарей и возвращает один список со всеми словарями
Args:
directory_path (str): Путь к директории с JSON файлами
Returns:
list: Список всех словарей из всех JSON файлов
"""
all_dicts = []
files = os.listdir(directory_path)
# Проходим по всем файлам в директории
for filename in files:
if filename.endswith('.json'):
file_path = os.path.join(directory_path, filename)
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Если данные - список словарей
if isinstance(data, list):
all_dicts.extend(data)
# Если данные - один словарь
elif isinstance(data, dict):
all_dicts.append(data)
except (json.JSONDecodeError, IOError) as e:
print(f"Ошибка чтения файла {filename}: {e}")
continue
return all_dicts
entities = {
'creator': 'fetch_creators',
'creators': 'fetch_creators',
'tag': 'fetch_tags',
'tags': 'fetch_tags',
'model': 'fetch_models',
'models': 'fetch_models',
'image': 'fetch_images',
'images': 'fetch_images',
}
@classmethod
def load(cls, client: Client, entity_type: str):
if entity_type in cls.entities: subdir = cls.entities[entity_type]
else: raise ValueError(f'Civit doesn\'t have entity type {entity_type}')
res = cls.load_json_dir(str(Path(client.path) / subdir))
return res
@classmethod
def datamodel(cls, client: Client, subdir, top = None):
if not top: top = 10
path = Path(client.path) / subdir
datamodel = EntityAnalyzer().analyze_directory(path, top_n=top)
return datamodel
@classmethod
def _save_json(cls, path, items):
with open(path, 'w') as f:
json.dump(items, f, indent=2, ensure_ascii=False)
@classmethod
def _paginated_crawler_parse_metadata(cls, page):
metadata = page.get('metadata', None)
if not metadata: raise RuntimeError("Unable to find metadata")
total_pages = metadata.get('totalPages', None)
current_page = metadata.get('currentPage', None)
if not total_pages or not current_page: RuntimeError("Unable to parse metadata")
print(f"Found! Total pages: {total_pages}")
return total_pages, current_page
@classmethod
def _paginated_crawler(cls, client: Client, entity: str, save = True):
items = list()
print(f"Fetching {entity}...")
path = Path(client.path) / ('fetch_' + entity)
first_page = client.get_creators_tags_raw(entity)
if first_page.get('items', None): items.extend(first_page.get('items', None))
if save:
path.mkdir(exist_ok=True)
cls._save_json(path / 'first.json', items)
total_pages, current_page = cls._paginated_crawler_parse_metadata(first_page)
for i in range(2, total_pages + 1):
print(f"Fetching page {i} of {total_pages}")
page = client.get_creators_tags_raw(entity, page=i)
time.sleep(3)
page_items = page.get('items', None)
if not page_items: continue
items.extend(page_items)
if save: cls._save_json(path / f'page_{i}.json', page_items)
if save: cls._save_json(path / f'all.json', items)
return items
@classmethod
def creators(cls, client: Client, subdir = 'fetch_creators', save = True):
return cls._paginated_crawler(client, 'creators', save)
@classmethod
def tags(cls, client: Client, subdir='fetch_tags', save=True):
return cls._paginated_crawler(client, 'tags', save)
@classmethod
def _cursor_crawler_parse_metadata(cls, page):
metadata = page.get('metadata', None)
if not metadata: raise RuntimeError("Unable to find metadata")
next_page = metadata.get('nextPage', None)
next_cursor = metadata.get('nextCursor', None)
if not next_page or not next_cursor: RuntimeError("Unable to parse metadata")
return next_page, next_cursor
@classmethod
def _cursor_crawler_avoid_slip(cls, client: Client, url, path, entity, slip_retries = 5, chill_time = 3):
slip_counter = 0
while True:
page = client.make_get_request(url).json()
try: next_page, next_cursor = cls._cursor_crawler_parse_metadata(page)
except RuntimeError: return page
if slip_counter >= slip_retries: break
if next_page == url:
slip_counter = slip_counter + 1
with open(Path(path) / 'slip.log', 'a') as file: file.write(f'{url}\n')
continue
else:
return page
if entity not in {'models'}: raise RuntimeError("Slip detected! Avoiding failed: NotImplemented")
split = url.rsplit('.', 1)
prefix = split[0] + '.'
split = split[1].split('%', 1)
suffix = '%' + split[1]
num = int(split[0])
if num < 999:
num = num + 1
else:
raise RuntimeError("Slip avoiding failed: Number overflow")
url = prefix + f'{num:03d}' + suffix
page = client.make_get_request(url).json()
next_page, next_cursor = cls._paginated_crawler_parse_metadata(page)
if next_page != url: return page
else: raise RuntimeError("Slip avoiding failed: Not effective")
@classmethod
def _cursor_crawler(cls, client: Client, entity: str, params: dict, save = True):
print(f"Fetching {entity}...")
path = Path(client.path) / ('fetch_' + entity)
items = list()
first_page = client.make_get_request(url=f'{client.config.base_url}/api/v1/{entity}{client.build_query_string(params)}').json()
if first_page.get('items', None): items.extend(first_page.get('items', None))
if save:
path.mkdir(exist_ok=True)
cls._save_json(path / 'first.json', items)
try: next_page, next_cursor = cls._cursor_crawler_parse_metadata(first_page)
except RuntimeError: return items
while next_page:
time.sleep(3)
page = cls._cursor_crawler_avoid_slip(client, next_page, path, entity)
page_items = page.get('items', None)
if not page_items:
with open(Path(client.path)/'bugs.log', 'a') as f: f.write(next_page + '\n')
return items
l = len(items)
items.extend(page_items)
print(f"Fetched {len(items) - l}/{len(page_items)} {entity} from page {next_page}")
if save: cls._save_json(path / f'page_{next_cursor}.json', page_items)
try: next_page, next_cursor = cls._cursor_crawler_parse_metadata(page)
except RuntimeError: break
if save: cls._save_json(path / f'all.json', items)
return items
@classmethod
def models(cls, client: Client, subdir='fetch_models', save=True):
return cls._cursor_crawler(client, 'models', {'period': 'AllTime', 'sort': 'Oldest', 'nsfw':'true'}, save)
@classmethod
def images(cls, client: Client, subdir='fetch_images', save=True, start_with = None):
items = list()
if not start_with: start_with = 0
path = Path(client.path) / ('fetch_' + 'images')
if save: path.mkdir(exist_ok=True)
creators = [c.get('username', None) for c in cls.load(client, 'creators')]
counter = 1 + int(start_with)
for username in creators[int(start_with):]:
# for username in ['yonne']:
time.sleep(3)
if not username: continue
page_items = cls._cursor_crawler(client, 'images', {'period': 'AllTime', 'sort': 'Oldest', 'nsfw':'X', 'username': username, 'limit': '200'}, save=False)
if len(page_items) >= 49000:
with open(path / '_giants.log', 'a') as f: f.write(username + '\n')
print(f'Giant {username} has more then {len(page_items)} images, starting deep scan')
page_items_dict = dict()
for item in page_items: page_items_dict[item['id']] = item
print(f'Transferred {len(page_items_dict)} images of {len(page_items)}')
for sort in ['Newest', 'Most Reactions', 'Most Comments', 'Most Collected']:
page_items = cls._cursor_crawler(client, 'images',
{'period': 'AllTime', 'sort': sort, 'nsfw': 'X',
'username': username, 'limit': '200'}, save=False)
l = len(page_items_dict)
for item in page_items: page_items_dict[item['id']] = item
print(f'Added {len(page_items_dict) - l} images by {sort} sort crawl. {len(page_items_dict)} images total')
page_items = [key for key, value in page_items_dict.items()]
l = len(items)
#items.extend(page_items)
print(f"Fetched {len(page_items)} images by {username} ({counter}/{len(creators)})")
counter = counter + 1
if save: cls._save_json(path / f'{username}.json', page_items)
#if save: cls._save_json(path / 'aaa.json', items)
return items

View File

@@ -1 +1,2 @@
colorama
colorama
requests

View File

@@ -0,0 +1,111 @@
import json
from modules.civit.fetch import Fetch
from shell.Handlers.ABS import Handler
from modules.civit.client import Client
class CivitHandler(Handler):
def __init__(self):
super().__init__()
self.forwarding_table: dict[str, Handler] = {
'fetch': FetchHandler(self)
}
self.handle_table: dict = {
'init': self._init,
}
self.client: Client | None = None
def _init(self, command: list[str], pos=0):
keys, args = self.parse_arguments(command[pos:], ['path', 'key'])
self._check_arg(keys, 'path')
self.client = Client(keys['path'], keys['key'])
self.succeed = True
class FetchHandler(Handler):
def __init__(self, parent):
super().__init__()
self.parent: CivitHandler = parent
self.forwarding_table: dict[str, Handler] = {
}
self.handle_table: dict = {
'creators_raw': self._creators_raw,
'creators': self._creators,
'tags_raw': self._tags_raw,
'tags': self._tags,
'models': self._models,
'images': self._images,
'datamodel': self._datamodel,
'load': self._load,
}
def _load(self, command: list[str], pos=0):
keys, args = self.parse_arguments(command[pos:], ['entity'])
self._check_arg(keys, 'entity')
res = Fetch.load(self.parent.client, keys['entity'])
for r in res: print(r)
self.succeed = True
def _creators_raw(self, command: list[str], pos=0):
keys, args = self.parse_arguments(command[pos:], ['page', 'limit', 'query'])
res = self.parent.client.get_creators_raw(page=keys['page'], limit=keys['limit'], query=keys['query'])
print(res)
self.succeed = True
def _creators(self, command: list[str], pos=0):
res = Fetch.creators(self.parent.client)
for r in res: print(r)
self.succeed = True
def _tags_raw(self, command: list[str], pos=0):
keys, args = self.parse_arguments(command[pos:], ['page', 'limit', 'query'])
res = self.parent.client.get_tags_raw(page=keys['page'], limit=keys['limit'], query=keys['query'])
print(res)
self.succeed = True
def _tags(self, command: list[str], pos=0):
res = Fetch.tags(self.parent.client)
for r in res: print(r)
self.succeed = True
def _models(self, command: list[str], pos=0):
res = Fetch.models(self.parent.client)
for r in res: print(r)
self.succeed = True
def _images(self, command: list[str], pos=0):
keys, args = self.parse_arguments(command[pos:], ['start'])
res = Fetch.images(self.parent.client, start_with=keys['start'])
for r in res: print(r)
self.succeed = True
def _datamodel(self, command: list[str], pos=0):
entities = {
'creator': 'fetch_creators',
'tag': 'fetch_tags',
'model': 'fetch_models',
'image': 'fetch_images',
}
keys, args = self.parse_arguments(command[pos:], ['entity', 'top', 'dump'])
self._check_arg(keys, 'entity')
if keys['entity'] in entities:
res = Fetch.datamodel(self.parent.client, entities[keys['entity']], keys['top'] or 10)
print(json.dumps(res, indent=2, ensure_ascii=False))
if keys['dump']:
with open(keys['dump'], 'w') as f:
json.dump(res, f, indent=2, ensure_ascii=False)
self.succeed = True

View File

@@ -1,4 +1,5 @@
from shell.Handlers.ABS import Handler
from shell.Handlers.CivitHandler import CivitHandler
from shell.Handlers.PythonappHandler import PythonappHandler
from shell.Handlers.ModelSpaceHandler import ModelSpaceHandler
@@ -9,6 +10,7 @@ class GlobalHandler(Handler):
self.forwarding_table: dict[str, Handler] = {
'pythonapp': PythonappHandler(),
'modelspace': ModelSpaceHandler(),
'civit': CivitHandler(),
}
self.handle_table: dict = {
'tell': self._tell